aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb/db/db_impl.cc')
-rw-r--r--src/leveldb/db/db_impl.cc99
1 files changed, 57 insertions, 42 deletions
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc
index fa1351038b..faf5e7d7ba 100644
--- a/src/leveldb/db/db_impl.cc
+++ b/src/leveldb/db/db_impl.cc
@@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
- manual_compaction_(NULL),
- consecutive_compaction_errors_(0) {
+ manual_compaction_(NULL) {
mem_->Ref();
has_imm_.Release_Store(NULL);
@@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
+ if (!bg_error_.ok()) {
+ // After a background error, we don't know whether a new version may
+ // or may not have been committed, so we cannot safely garbage collect.
+ return;
+ }
+
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
@@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s;
}
-Status DBImpl::CompactMemTable() {
+void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL);
@@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
+ } else {
+ RecordBackgroundError(s);
}
-
- return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
@@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
}
MutexLock l(&mutex_);
- while (!manual.done) {
- while (manual_compaction_ != NULL) {
- bg_cv_.Wait();
- }
- manual_compaction_ = &manual;
- MaybeScheduleCompaction();
- while (manual_compaction_ == &manual) {
+ while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
+ if (manual_compaction_ == NULL) { // Idle
+ manual_compaction_ = &manual;
+ MaybeScheduleCompaction();
+ } else { // Running either my compaction or another compaction.
bg_cv_.Wait();
}
}
+ if (manual_compaction_ == &manual) {
+ // Cancel my manual compaction since we aborted early for some reason.
+ manual_compaction_ = NULL;
+ }
}
Status DBImpl::TEST_CompactMemTable() {
@@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}
+void DBImpl::RecordBackgroundError(const Status& s) {
+ mutex_.AssertHeld();
+ if (bg_error_.ok()) {
+ bg_error_ = s;
+ bg_cv_.SignalAll();
+ }
+}
+
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
+ } else if (!bg_error_.ok()) {
+ // Already got an error; no more changes
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
@@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
- if (!shutting_down_.Acquire_Load()) {
- Status s = BackgroundCompaction();
- if (s.ok()) {
- // Success
- consecutive_compaction_errors_ = 0;
- } else if (shutting_down_.Acquire_Load()) {
- // Error most likely due to shutdown; do not wait
- } else {
- // Wait a little bit before retrying background compaction in
- // case this is an environmental problem and we do not want to
- // chew up resources for failed compactions for the duration of
- // the problem.
- bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
- Log(options_.info_log, "Waiting after background compaction error: %s",
- s.ToString().c_str());
- mutex_.Unlock();
- ++consecutive_compaction_errors_;
- int seconds_to_sleep = 1;
- for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
- seconds_to_sleep *= 2;
- }
- env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
- mutex_.Lock();
- }
+ if (shutting_down_.Acquire_Load()) {
+ // No more background work when shutting down.
+ } else if (!bg_error_.ok()) {
+ // No more background work after a background error.
+ } else {
+ BackgroundCompaction();
}
bg_compaction_scheduled_ = false;
@@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
bg_cv_.SignalAll();
}
-Status DBImpl::BackgroundCompaction() {
+void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
- return CompactMemTable();
+ CompactMemTable();
+ return;
}
Compaction* c;
@@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
+ if (!status.ok()) {
+ RecordBackgroundError(status);
+ }
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
@@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
+ if (!status.ok()) {
+ RecordBackgroundError(status);
+ }
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
@@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
- if (options_.paranoid_checks && bg_error_.ok()) {
- bg_error_ = status;
- }
}
if (is_manual) {
@@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
}
manual_compaction_ = NULL;
}
- return status;
}
void DBImpl::CleanupCompaction(CompactionState* compact) {
@@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) {
status = InstallCompactionResults(compact);
}
+ if (!status.ok()) {
+ RecordBackgroundError(status);
+ }
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
@@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
+ bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
+ if (!status.ok()) {
+ sync_error = true;
+ }
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
+ if (sync_error) {
+ // The state of the log file is indeterminate: the log record we
+ // just added may or may not show up when the DB is re-opened.
+ // So we force the DB into a mode where all future writes fail.
+ RecordBackgroundError(status);
+ }
}
if (updates == tmp_batch_) tmp_batch_->Clear();