diff options
author | Pieter Wuille <pieter.wuille@gmail.com> | 2013-12-12 22:08:18 +0100 |
---|---|---|
committer | Pieter Wuille <pieter.wuille@gmail.com> | 2013-12-12 22:08:18 +0100 |
commit | ed873a301eeff8c021ba57c78fb702fd2b6f0452 (patch) | |
tree | 3fb5bcb91b8ceab464a50e63b6fce407eccb17e2 /src/leveldb/db/db_impl.cc | |
parent | fb34be6d5905d36a2d9a392d87aa73bdb9470a1b (diff) | |
parent | 55c68902946c8c9bacea748c159be456a7c9c180 (diff) |
Merge src/leveldb changes for LevelDB 1.15
Diffstat (limited to 'src/leveldb/db/db_impl.cc')
-rw-r--r-- | src/leveldb/db/db_impl.cc | 99 |
1 files changed, 57 insertions, 42 deletions
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index fa1351038b..faf5e7d7ba 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), - manual_compaction_(NULL), - consecutive_compaction_errors_(0) { + manual_compaction_(NULL) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + if (!bg_error_.ok()) { + // After a background error, we don't know whether a new version may + // or may not have been committed, so we cannot safely garbage collect. + return; + } + // Make a set of all of the live files std::set<uint64_t> live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); @@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() { imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); + } else { + RecordBackgroundError(s); } - - return s; } void DBImpl::CompactRange(const Slice* begin, const Slice* end) { @@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } MutexLock l(&mutex_); - while (!manual.done) { - while (manual_compaction_ != NULL) { - bg_cv_.Wait(); - } - manual_compaction_ = &manual; - MaybeScheduleCompaction(); - while (manual_compaction_ == &manual) { + while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { + if (manual_compaction_ == NULL) { // Idle + manual_compaction_ = &manual; + MaybeScheduleCompaction(); + } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } } + if (manual_compaction_ == &manual) { + // Cancel my manual compaction since we aborted early for some reason. + manual_compaction_ = NULL; + } } Status DBImpl::TEST_CompactMemTable() { @@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::RecordBackgroundError(const Status& s) { + mutex_.AssertHeld(); + if (bg_error_.ok()) { + bg_error_ = s; + bg_cv_.SignalAll(); + } +} + void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions + } else if (!bg_error_.ok()) { + // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { @@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); - if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); - if (s.ok()) { - // Success - consecutive_compaction_errors_ = 0; - } else if (shutting_down_.Acquire_Load()) { - // Error most likely due to shutdown; do not wait - } else { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); - mutex_.Unlock(); - ++consecutive_compaction_errors_; - int seconds_to_sleep = 1; - for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) { - seconds_to_sleep *= 2; - } - env_->SleepForMicroseconds(seconds_to_sleep * 1000000); - mutex_.Lock(); - } + if (shutting_down_.Acquire_Load()) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + BackgroundCompaction(); } bg_compaction_scheduled_ = false; @@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { - return CompactMemTable(); + CompactMemTable(); + return; } Compaction* c; @@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() { c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), @@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() { } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); + if (!status.ok()) { + RecordBackgroundError(status); + } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); @@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() { } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (options_.paranoid_checks && bg_error_.ok()) { - bg_error_ = status; - } } if (is_manual) { @@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() { } manual_compaction_ = NULL; } - return status; } void DBImpl::CleanupCompaction(CompactionState* compact) { @@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = InstallCompactionResults(compact); } + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); @@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); + if (!status.ok()) { + sync_error = true; + } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); + if (sync_error) { + // The state of the log file is indeterminate: the log record we + // just added may or may not show up when the DB is re-opened. + // So we force the DB into a mode where all future writes fail. + RecordBackgroundError(status); + } } if (updates == tmp_batch_) tmp_batch_->Clear(); |