diff options
Diffstat (limited to 'src/leveldb/db/db_impl.cc')
-rw-r--r-- | src/leveldb/db/db_impl.cc | 547 |
1 files changed, 267 insertions, 280 deletions
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 3bb58e560a..65e31724bc 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -4,12 +4,15 @@ #include "db/db_impl.h" +#include <stdint.h> +#include <stdio.h> + #include <algorithm> +#include <atomic> #include <set> #include <string> -#include <stdint.h> -#include <stdio.h> #include <vector> + #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -39,16 +42,33 @@ const int kNumNonTableCacheFiles = 10; // Information kept for every waiting writer struct DBImpl::Writer { + explicit Writer(port::Mutex* mu) + : batch(nullptr), sync(false), done(false), cv(mu) {} + Status status; WriteBatch* batch; bool sync; bool done; port::CondVar cv; - - explicit Writer(port::Mutex* mu) : cv(mu) { } }; struct DBImpl::CompactionState { + // Files produced by compaction + struct Output { + uint64_t number; + uint64_t file_size; + InternalKey smallest, largest; + }; + + Output* current_output() { return &outputs[outputs.size() - 1]; } + + explicit CompactionState(Compaction* c) + : compaction(c), + smallest_snapshot(0), + outfile(nullptr), + builder(nullptr), + total_bytes(0) {} + Compaction* const compaction; // Sequence numbers < smallest_snapshot are not significant since we @@ -57,12 +77,6 @@ struct DBImpl::CompactionState { // we can drop all entries for the same key with sequence numbers < S. SequenceNumber smallest_snapshot; - // Files produced by compaction - struct Output { - uint64_t number; - uint64_t file_size; - InternalKey smallest, largest; - }; std::vector<Output> outputs; // State kept for output being generated @@ -70,19 +84,10 @@ struct DBImpl::CompactionState { TableBuilder* builder; uint64_t total_bytes; - - Output* current_output() { return &outputs[outputs.size()-1]; } - - explicit CompactionState(Compaction* c) - : compaction(c), - outfile(NULL), - builder(NULL), - total_bytes(0) { - } }; // Fix user-supplied options to be reasonable -template <class T,class V> +template <class T, class V> static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; @@ -93,27 +98,32 @@ Options SanitizeOptions(const std::string& dbname, const Options& src) { Options result = src; result.comparator = icmp; - result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; - ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); - ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.max_file_size, 1<<20, 1<<30); - ClipToRange(&result.block_size, 1<<10, 4<<20); - if (result.info_log == NULL) { + result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; + ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); + ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30); + ClipToRange(&result.max_file_size, 1 << 20, 1 << 30); + ClipToRange(&result.block_size, 1 << 10, 4 << 20); + if (result.info_log == nullptr) { // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); if (!s.ok()) { // No place suitable for logging - result.info_log = NULL; + result.info_log = nullptr; } } - if (result.block_cache == NULL) { + if (result.block_cache == nullptr) { result.block_cache = NewLRUCache(8 << 20); } return result; } +static int TableCacheSize(const Options& sanitized_options) { + // Reserve ten files or so for other uses and give the rest to TableCache. + return sanitized_options.max_open_files - kNumNonTableCacheFiles; +} + DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) : env_(raw_options.env), internal_comparator_(raw_options.comparator), @@ -123,44 +133,39 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) owns_info_log_(options_.info_log != raw_options.info_log), owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), - db_lock_(NULL), - shutting_down_(NULL), - bg_cv_(&mutex_), - mem_(NULL), - imm_(NULL), - logfile_(NULL), + table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), + db_lock_(nullptr), + shutting_down_(false), + background_work_finished_signal_(&mutex_), + mem_(nullptr), + imm_(nullptr), + has_imm_(false), + logfile_(nullptr), logfile_number_(0), - log_(NULL), + log_(nullptr), seed_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - manual_compaction_(NULL) { - has_imm_.Release_Store(NULL); - - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; - table_cache_ = new TableCache(dbname_, &options_, table_cache_size); - - versions_ = new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_); -} + background_compaction_scheduled_(false), + manual_compaction_(nullptr), + versions_(new VersionSet(dbname_, &options_, table_cache_, + &internal_comparator_)) {} DBImpl::~DBImpl() { - // Wait for background work to finish + // Wait for background work to finish. mutex_.Lock(); - shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); + shutting_down_.store(true, std::memory_order_release); + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); } mutex_.Unlock(); - if (db_lock_ != NULL) { + if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } delete versions_; - if (mem_ != NULL) mem_->Unref(); - if (imm_ != NULL) imm_->Unref(); + if (mem_ != nullptr) mem_->Unref(); + if (imm_ != nullptr) imm_->Unref(); delete tmp_batch_; delete log_; delete logfile_; @@ -216,6 +221,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); + if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. @@ -227,11 +234,12 @@ void DBImpl::DeleteObsoleteFiles() { versions_->AddLiveFiles(&live); std::vector<std::string> filenames; - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { + std::vector<std::string> files_to_delete; + for (std::string& filename : filenames) { + if (ParseFileName(filename, &number, &type)) { bool keep = true; switch (type) { case kLogFile: @@ -259,26 +267,34 @@ void DBImpl::DeleteObsoleteFiles() { } if (!keep) { + files_to_delete.push_back(std::move(filename)); if (type == kTableFile) { table_cache_->Evict(number); } - Log(options_.info_log, "Delete type=%d #%lld\n", - int(type), + Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type), static_cast<unsigned long long>(number)); - env_->DeleteFile(dbname_ + "/" + filenames[i]); } } } + + // While deleting all files unblock other threads. All files being deleted + // have unique names which will not collide with newly created files and + // are therefore safe to delete while allowing other threads to proceed. + mutex_.Unlock(); + for (const std::string& filename : files_to_delete) { + env_->DeleteFile(dbname_ + "/" + filename); + } + mutex_.Lock(); } -Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) { +Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { mutex_.AssertHeld(); // Ignore error from CreateDir since the creation of the DB is // committed only when the descriptor is created, and this directory // may already exist from a previous failed creation attempt. env_->CreateDir(dbname_); - assert(db_lock_ == NULL); + assert(db_lock_ == nullptr); Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; @@ -296,8 +312,8 @@ Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) { } } else { if (options_.error_if_exists) { - return Status::InvalidArgument( - dbname_, "exists (error_if_exists is true)"); + return Status::InvalidArgument(dbname_, + "exists (error_if_exists is true)"); } } @@ -369,12 +385,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, Env* env; Logger* info_log; const char* fname; - Status* status; // NULL if options_.paranoid_checks==false - virtual void Corruption(size_t bytes, const Status& s) { + Status* status; // null if options_.paranoid_checks==false + void Corruption(size_t bytes, const Status& s) override { Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == NULL ? "(ignoring error) " : ""), - fname, static_cast<int>(bytes), s.ToString().c_str()); - if (this->status != NULL && this->status->ok()) *this->status = s; + (this->status == nullptr ? "(ignoring error) " : ""), fname, + static_cast<int>(bytes), s.ToString().c_str()); + if (this->status != nullptr && this->status->ok()) *this->status = s; } }; @@ -394,32 +410,30 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, reporter.env = env_; reporter.info_log = options_.info_log; reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : NULL); + reporter.status = (options_.paranoid_checks ? &status : nullptr); // We intentionally make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - log::Reader reader(file, &reporter, true/*checksum*/, - 0/*initial_offset*/); + log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/); Log(options_.info_log, "Recovering log #%llu", - (unsigned long long) log_number); + (unsigned long long)log_number); // Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; int compactions = 0; - MemTable* mem = NULL; - while (reader.ReadRecord(&record, &scratch) && - status.ok()) { + MemTable* mem = nullptr; + while (reader.ReadRecord(&record, &scratch) && status.ok()) { if (record.size() < 12) { - reporter.Corruption( - record.size(), Status::Corruption("log record too small", fname)); + reporter.Corruption(record.size(), + Status::Corruption("log record too small", fname)); continue; } WriteBatchInternal::SetContents(&batch, record); - if (mem == NULL) { + if (mem == nullptr) { mem = new MemTable(internal_comparator_); mem->Ref(); } @@ -428,9 +442,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, if (!status.ok()) { break; } - const SequenceNumber last_seq = - WriteBatchInternal::Sequence(&batch) + - WriteBatchInternal::Count(&batch) - 1; + const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + + WriteBatchInternal::Count(&batch) - 1; if (last_seq > *max_sequence) { *max_sequence = last_seq; } @@ -438,9 +451,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; - status = WriteLevel0Table(mem, edit, NULL); + status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); - mem = NULL; + mem = nullptr; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. @@ -453,31 +466,31 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, // See if we should keep reusing the last log file. if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { - assert(logfile_ == NULL); - assert(log_ == NULL); - assert(mem_ == NULL); + assert(logfile_ == nullptr); + assert(log_ == nullptr); + assert(mem_ == nullptr); uint64_t lfile_size; if (env_->GetFileSize(fname, &lfile_size).ok() && env_->NewAppendableFile(fname, &logfile_).ok()) { Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); log_ = new log::Writer(logfile_, lfile_size); logfile_number_ = log_number; - if (mem != NULL) { + if (mem != nullptr) { mem_ = mem; - mem = NULL; + mem = nullptr; } else { - // mem can be NULL if lognum exists but was empty. + // mem can be nullptr if lognum exists but was empty. mem_ = new MemTable(internal_comparator_); mem_->Ref(); } } } - if (mem != NULL) { + if (mem != nullptr) { // mem did not get reused; compact it. if (status.ok()) { *save_manifest = true; - status = WriteLevel0Table(mem, edit, NULL); + status = WriteLevel0Table(mem, edit, nullptr); } mem->Unref(); } @@ -494,7 +507,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(options_.info_log, "Level-0 table #%llu: started", - (unsigned long long) meta.number); + (unsigned long long)meta.number); Status s; { @@ -504,24 +517,22 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, } Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", - (unsigned long long) meta.number, - (unsigned long long) meta.file_size, + (unsigned long long)meta.number, (unsigned long long)meta.file_size, s.ToString().c_str()); delete iter; pending_outputs_.erase(meta.number); - // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); - if (base != NULL) { + if (base != nullptr) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } - edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + edit->AddFile(level, meta.number, meta.file_size, meta.smallest, + meta.largest); } CompactionStats stats; @@ -533,7 +544,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, void DBImpl::CompactMemTable() { mutex_.AssertHeld(); - assert(imm_ != NULL); + assert(imm_ != nullptr); // Save the contents of the memtable as a new Table VersionEdit edit; @@ -542,7 +553,7 @@ void DBImpl::CompactMemTable() { Status s = WriteLevel0Table(imm_, &edit, base); base->Unref(); - if (s.ok() && shutting_down_.Acquire_Load()) { + if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { s = Status::IOError("Deleting DB during memtable compaction"); } @@ -556,8 +567,8 @@ void DBImpl::CompactMemTable() { if (s.ok()) { // Commit to the new state imm_->Unref(); - imm_ = NULL; - has_imm_.Release_Store(NULL); + imm_ = nullptr; + has_imm_.store(false, std::memory_order_release); DeleteObsoleteFiles(); } else { RecordBackgroundError(s); @@ -575,13 +586,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } } } - TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } } -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { +void DBImpl::TEST_CompactRange(int level, const Slice* begin, + const Slice* end) { assert(level >= 0); assert(level + 1 < config::kNumLevels); @@ -590,44 +602,45 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { ManualCompaction manual; manual.level = level; manual.done = false; - if (begin == NULL) { - manual.begin = NULL; + if (begin == nullptr) { + manual.begin = nullptr; } else { begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); manual.begin = &begin_storage; } - if (end == NULL) { - manual.end = NULL; + if (end == nullptr) { + manual.end = nullptr; } else { end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); manual.end = &end_storage; } MutexLock l(&mutex_); - while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { - if (manual_compaction_ == NULL) { // Idle + while (!manual.done && !shutting_down_.load(std::memory_order_acquire) && + bg_error_.ok()) { + if (manual_compaction_ == nullptr) { // Idle manual_compaction_ = &manual; MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } } if (manual_compaction_ == &manual) { // Cancel my manual compaction since we aborted early for some reason. - manual_compaction_ = NULL; + manual_compaction_ = nullptr; } } Status DBImpl::TEST_CompactMemTable() { - // NULL batch means just wait for earlier writes to be done - Status s = Write(WriteOptions(), NULL); + // nullptr batch means just wait for earlier writes to be done + Status s = Write(WriteOptions(), nullptr); if (s.ok()) { // Wait until the compaction completes MutexLock l(&mutex_); - while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); + while (imm_ != nullptr && bg_error_.ok()) { + background_work_finished_signal_.Wait(); } - if (imm_ != NULL) { + if (imm_ != nullptr) { s = bg_error_; } } @@ -638,24 +651,23 @@ void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { bg_error_ = s; - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } } void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { + if (background_compaction_scheduled_) { // Already scheduled - } else if (shutting_down_.Acquire_Load()) { + } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } else if (!bg_error_.ok()) { // Already got an error; no more changes - } else if (imm_ == NULL && - manual_compaction_ == NULL && + } else if (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done } else { - bg_compaction_scheduled_ = true; + background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } } @@ -666,8 +678,8 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); - if (shutting_down_.Acquire_Load()) { + assert(background_compaction_scheduled_); + if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. @@ -675,36 +687,35 @@ void DBImpl::BackgroundCall() { BackgroundCompaction(); } - bg_compaction_scheduled_ = false; + background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); - if (imm_ != NULL) { + if (imm_ != nullptr) { CompactMemTable(); return; } Compaction* c; - bool is_manual = (manual_compaction_ != NULL); + bool is_manual = (manual_compaction_ != nullptr); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); - if (c != NULL) { + m->done = (c == nullptr); + if (c != nullptr) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, "Manual compaction at level-%d from %s .. %s; will stop at %s\n", - m->level, - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { @@ -712,26 +723,24 @@ void DBImpl::BackgroundCompaction() { } Status status; - if (c == NULL) { + if (c == nullptr) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); - c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, + f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast<unsigned long long>(f->number), - c->level() + 1, + static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->file_size), - status.ToString().c_str(), - versions_->LevelSummary(&tmp)); + status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); @@ -746,11 +755,10 @@ void DBImpl::BackgroundCompaction() { if (status.ok()) { // Done - } else if (shutting_down_.Acquire_Load()) { + } else if (shutting_down_.load(std::memory_order_acquire)) { // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); + Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); } if (is_manual) { @@ -764,18 +772,18 @@ void DBImpl::BackgroundCompaction() { m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } - manual_compaction_ = NULL; + manual_compaction_ = nullptr; } } void DBImpl::CleanupCompaction(CompactionState* compact) { mutex_.AssertHeld(); - if (compact->builder != NULL) { + if (compact->builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction compact->builder->Abandon(); delete compact->builder; } else { - assert(compact->outfile == NULL); + assert(compact->outfile == nullptr); } delete compact->outfile; for (size_t i = 0; i < compact->outputs.size(); i++) { @@ -786,8 +794,8 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { } Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { - assert(compact != NULL); - assert(compact->builder == NULL); + assert(compact != nullptr); + assert(compact->builder == nullptr); uint64_t file_number; { mutex_.Lock(); @@ -812,9 +820,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Iterator* input) { - assert(compact != NULL); - assert(compact->outfile != NULL); - assert(compact->builder != NULL); + assert(compact != nullptr); + assert(compact->outfile != nullptr); + assert(compact->builder != nullptr); const uint64_t output_number = compact->current_output()->number; assert(output_number != 0); @@ -831,7 +839,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, compact->current_output()->file_size = current_bytes; compact->total_bytes += current_bytes; delete compact->builder; - compact->builder = NULL; + compact->builder = nullptr; // Finish and check for file errors if (s.ok()) { @@ -841,35 +849,29 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, s = compact->outfile->Close(); } delete compact->outfile; - compact->outfile = NULL; + compact->outfile = nullptr; if (s.ok() && current_entries > 0) { // Verify that the table is usable - Iterator* iter = table_cache_->NewIterator(ReadOptions(), - output_number, - current_bytes); + Iterator* iter = + table_cache_->NewIterator(ReadOptions(), output_number, current_bytes); s = iter->status(); delete iter; if (s.ok()) { - Log(options_.info_log, - "Generated table #%llu@%d: %lld keys, %lld bytes", - (unsigned long long) output_number, - compact->compaction->level(), - (unsigned long long) current_entries, - (unsigned long long) current_bytes); + Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes", + (unsigned long long)output_number, compact->compaction->level(), + (unsigned long long)current_entries, + (unsigned long long)current_bytes); } } return s; } - Status DBImpl::InstallCompactionResults(CompactionState* compact) { mutex_.AssertHeld(); - Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->level() + 1, + Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", + compact->compaction->num_input_files(0), compact->compaction->level(), + compact->compaction->num_input_files(1), compact->compaction->level() + 1, static_cast<long long>(compact->total_bytes)); // Add compaction outputs @@ -877,9 +879,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { const int level = compact->compaction->level(); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; - compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, + out.smallest, out.largest); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -888,39 +889,40 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, "Compacting %d@%d + %d@%d files", - compact->compaction->num_input_files(0), - compact->compaction->level(), + Log(options_.info_log, "Compacting %d@%d + %d@%d files", + compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level() + 1); assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); - assert(compact->builder == NULL); - assert(compact->outfile == NULL); + assert(compact->builder == nullptr); + assert(compact->outfile == nullptr); if (snapshots_.empty()) { compact->smallest_snapshot = versions_->LastSequence(); } else { - compact->smallest_snapshot = snapshots_.oldest()->number_; + compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); } + Iterator* input = versions_->MakeInputIterator(compact->compaction); + // Release mutex while we're actually doing the compaction work mutex_.Unlock(); - Iterator* input = versions_->MakeInputIterator(compact->compaction); input->SeekToFirst(); Status status; ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { + while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work - if (has_imm_.NoBarrier_Load() != NULL) { + if (has_imm_.load(std::memory_order_relaxed)) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); - if (imm_ != NULL) { + if (imm_ != nullptr) { CompactMemTable(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); @@ -928,7 +930,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Slice key = input->key(); if (compact->compaction->ShouldStopBefore(key) && - compact->builder != NULL) { + compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; @@ -944,8 +946,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { last_sequence_for_key = kMaxSequenceNumber; } else { if (!has_current_user_key || - user_comparator()->Compare(ikey.user_key, - Slice(current_user_key)) != 0) { + user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != + 0) { // First occurrence of this user key current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; @@ -954,7 +956,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key - drop = true; // (A) + drop = true; // (A) } else if (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { @@ -982,7 +984,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (!drop) { // Open output file if necessary - if (compact->builder == NULL) { + if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; @@ -1007,17 +1009,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { input->Next(); } - if (status.ok() && shutting_down_.Acquire_Load()) { + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { status = Status::IOError("Deleting DB during compaction"); } - if (status.ok() && compact->builder != NULL) { + if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input); } if (status.ok()) { status = input->status(); } delete input; - input = NULL; + input = nullptr; CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; @@ -1040,34 +1042,37 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; - Log(options_.info_log, - "compacted to: %s", versions_->LevelSummary(&tmp)); + Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status; } namespace { + struct IterState { - port::Mutex* mu; - Version* version; - MemTable* mem; - MemTable* imm; + port::Mutex* const mu; + Version* const version GUARDED_BY(mu); + MemTable* const mem GUARDED_BY(mu); + MemTable* const imm GUARDED_BY(mu); + + IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) + : mu(mutex), version(version), mem(mem), imm(imm) {} }; static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast<IterState*>(arg1); state->mu->Lock(); state->mem->Unref(); - if (state->imm != NULL) state->imm->Unref(); + if (state->imm != nullptr) state->imm->Unref(); state->version->Unref(); state->mu->Unlock(); delete state; } -} // namespace + +} // anonymous namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot, uint32_t* seed) { - IterState* cleanup = new IterState; mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); @@ -1075,7 +1080,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, std::vector<Iterator*> list; list.push_back(mem_->NewIterator()); mem_->Ref(); - if (imm_ != NULL) { + if (imm_ != nullptr) { list.push_back(imm_->NewIterator()); imm_->Ref(); } @@ -1084,11 +1089,8 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); - cleanup->mu = &mutex_; - cleanup->mem = mem_; - cleanup->imm = imm_; - cleanup->version = versions_->current(); - internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); + IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); *seed = ++seed_; mutex_.Unlock(); @@ -1106,14 +1108,14 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } -Status DBImpl::Get(const ReadOptions& options, - const Slice& key, +Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; MutexLock l(&mutex_); SequenceNumber snapshot; - if (options.snapshot != NULL) { - snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; + if (options.snapshot != nullptr) { + snapshot = + static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number(); } else { snapshot = versions_->LastSequence(); } @@ -1122,7 +1124,7 @@ Status DBImpl::Get(const ReadOptions& options, MemTable* imm = imm_; Version* current = versions_->current(); mem->Ref(); - if (imm != NULL) imm->Ref(); + if (imm != nullptr) imm->Ref(); current->Ref(); bool have_stat_update = false; @@ -1135,7 +1137,7 @@ Status DBImpl::Get(const ReadOptions& options, LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { // Done - } else if (imm != NULL && imm->Get(lkey, value, &s)) { + } else if (imm != nullptr && imm->Get(lkey, value, &s)) { // Done } else { s = current->Get(options, lkey, value, &stats); @@ -1148,7 +1150,7 @@ Status DBImpl::Get(const ReadOptions& options, MaybeScheduleCompaction(); } mem->Unref(); - if (imm != NULL) imm->Unref(); + if (imm != nullptr) imm->Unref(); current->Unref(); return s; } @@ -1157,12 +1159,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); - return NewDBIterator( - this, user_comparator(), iter, - (options.snapshot != NULL - ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ - : latest_snapshot), - seed); + return NewDBIterator(this, user_comparator(), iter, + (options.snapshot != nullptr + ? static_cast<const SnapshotImpl*>(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); } void DBImpl::RecordReadSample(Slice key) { @@ -1177,9 +1179,9 @@ const Snapshot* DBImpl::GetSnapshot() { return snapshots_.New(versions_->LastSequence()); } -void DBImpl::ReleaseSnapshot(const Snapshot* s) { +void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { MutexLock l(&mutex_); - snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s)); + snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot)); } // Convenience methods @@ -1191,9 +1193,9 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } -Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { +Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); - w.batch = my_batch; + w.batch = updates; w.sync = options.sync; w.done = false; @@ -1207,13 +1209,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // May temporarily unlock and wait. - Status status = MakeRoomForWrite(my_batch == NULL); + Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; - if (status.ok() && my_batch != NULL) { // NULL batch is for compactions - WriteBatch* updates = BuildBatchGroup(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence + 1); - last_sequence += WriteBatchInternal::Count(updates); + if (status.ok() && updates != nullptr) { // nullptr batch is for compactions + WriteBatch* write_batch = BuildBatchGroup(&last_writer); + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(write_batch); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -1221,7 +1223,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // into mem_. { mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); @@ -1230,7 +1232,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); + status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { @@ -1240,7 +1242,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordBackgroundError(status); } } - if (updates == tmp_batch_) tmp_batch_->Clear(); + if (write_batch == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } @@ -1265,12 +1267,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // REQUIRES: Writer list must be non-empty -// REQUIRES: First writer must have a non-NULL batch +// REQUIRES: First writer must have a non-null batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + mutex_.AssertHeld(); assert(!writers_.empty()); Writer* first = writers_.front(); WriteBatch* result = first->batch; - assert(result != NULL); + assert(result != nullptr); size_t size = WriteBatchInternal::ByteSize(first->batch); @@ -1278,8 +1281,8 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // original write is small, limit the growth so we do not slow // down the small write too much. size_t max_size = 1 << 20; - if (size <= (128<<10)) { - max_size = size + (128<<10); + if (size <= (128 << 10)) { + max_size = size + (128 << 10); } *last_writer = first; @@ -1292,7 +1295,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { break; } - if (w->batch != NULL) { + if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { // Do not make batch too big @@ -1325,9 +1328,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { // Yield previous error s = bg_error_; break; - } else if ( - allow_delay && - versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { + } else if (allow_delay && versions_->NumLevelFiles(0) >= + config::kL0_SlowdownWritesTrigger) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each @@ -1342,20 +1344,20 @@ Status DBImpl::MakeRoomForWrite(bool force) { (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable break; - } else if (imm_ != NULL) { + } else if (imm_ != nullptr) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); - WritableFile* lfile = NULL; + WritableFile* lfile = nullptr; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. @@ -1368,10 +1370,10 @@ Status DBImpl::MakeRoomForWrite(bool force) { logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; - has_imm_.Release_Store(imm_); + has_imm_.store(true, std::memory_order_release); mem_ = new MemTable(internal_comparator_); mem_->Ref(); - force = false; // Do not force another compaction if have room + force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } } @@ -1405,21 +1407,16 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), " Compactions\n" "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" - "--------------------------------------------------\n" - ); + "--------------------------------------------------\n"); value->append(buf); for (int level = 0; level < config::kNumLevels; level++) { int files = versions_->NumLevelFiles(level); if (stats_[level].micros > 0 || files > 0) { - snprintf( - buf, sizeof(buf), - "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", - level, - files, - versions_->NumLevelBytes(level) / 1048576.0, - stats_[level].micros / 1e6, - stats_[level].bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0); + snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level, + files, versions_->NumLevelBytes(level) / 1048576.0, + stats_[level].micros / 1e6, + stats_[level].bytes_read / 1048576.0, + stats_[level].bytes_written / 1048576.0); value->append(buf); } } @@ -1445,16 +1442,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { return false; } -void DBImpl::GetApproximateSizes( - const Range* range, int n, - uint64_t* sizes) { +void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // TODO(opt): better implementation - Version* v; - { - MutexLock l(&mutex_); - versions_->current()->Ref(); - v = versions_->current(); - } + MutexLock l(&mutex_); + Version* v = versions_->current(); + v->Ref(); for (int i = 0; i < n; i++) { // Convert user_key into a corresponding internal key. @@ -1465,10 +1457,7 @@ void DBImpl::GetApproximateSizes( sizes[i] = (limit >= start ? limit - start : 0); } - { - MutexLock l(&mutex_); - v->Unref(); - } + v->Unref(); } // Default implementations of convenience methods that subclasses of DB @@ -1485,11 +1474,10 @@ Status DB::Delete(const WriteOptions& opt, const Slice& key) { return Write(opt, &batch); } -DB::~DB() { } +DB::~DB() = default; -Status DB::Open(const Options& options, const std::string& dbname, - DB** dbptr) { - *dbptr = NULL; +Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { + *dbptr = nullptr; DBImpl* impl = new DBImpl(options, dbname); impl->mutex_.Lock(); @@ -1497,7 +1485,7 @@ Status DB::Open(const Options& options, const std::string& dbname, // Recover handles create_if_missing, error_if_exists bool save_manifest = false; Status s = impl->Recover(&edit, &save_manifest); - if (s.ok() && impl->mem_ == NULL) { + if (s.ok() && impl->mem_ == nullptr) { // Create new log and a corresponding memtable. uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; @@ -1523,7 +1511,7 @@ Status DB::Open(const Options& options, const std::string& dbname, } impl->mutex_.Unlock(); if (s.ok()) { - assert(impl->mem_ != NULL); + assert(impl->mem_ != nullptr); *dbptr = impl; } else { delete impl; @@ -1531,21 +1519,20 @@ Status DB::Open(const Options& options, const std::string& dbname, return s; } -Snapshot::~Snapshot() { -} +Snapshot::~Snapshot() = default; Status DestroyDB(const std::string& dbname, const Options& options) { Env* env = options.env; std::vector<std::string> filenames; - // Ignore error in case directory does not exist - env->GetChildren(dbname, &filenames); - if (filenames.empty()) { + Status result = env->GetChildren(dbname, &filenames); + if (!result.ok()) { + // Ignore error in case directory does not exist return Status::OK(); } FileLock* lock; const std::string lockname = LockFileName(dbname); - Status result = env->LockFile(lockname, &lock); + result = env->LockFile(lockname, &lock); if (result.ok()) { uint64_t number; FileType type; |