aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb/db/db_impl.cc')
-rw-r--r--src/leveldb/db/db_impl.cc195
1 files changed, 125 insertions, 70 deletions
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc
index 49b95953b4..f43ad76794 100644
--- a/src/leveldb/db/db_impl.cc
+++ b/src/leveldb/db/db_impl.cc
@@ -96,6 +96,7 @@ Options SanitizeOptions(const std::string& dbname,
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
+ ClipToRange(&result.max_file_size, 1<<20, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
@@ -125,7 +126,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
- mem_(new MemTable(internal_comparator_)),
+ mem_(NULL),
imm_(NULL),
logfile_(NULL),
logfile_number_(0),
@@ -134,7 +135,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
- mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
@@ -271,7 +271,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
-Status DBImpl::Recover(VersionEdit* edit) {
+Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
@@ -301,66 +301,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}
- s = versions_->Recover();
- if (s.ok()) {
- SequenceNumber max_sequence(0);
-
- // Recover from all newer log files than the ones named in the
- // descriptor (new log files may have been added by the previous
- // incarnation without registering them in the descriptor).
- //
- // Note that PrevLogNumber() is no longer used, but we pay
- // attention to it in case we are recovering a database
- // produced by an older version of leveldb.
- const uint64_t min_log = versions_->LogNumber();
- const uint64_t prev_log = versions_->PrevLogNumber();
- std::vector<std::string> filenames;
- s = env_->GetChildren(dbname_, &filenames);
+ s = versions_->Recover(save_manifest);
+ if (!s.ok()) {
+ return s;
+ }
+ SequenceNumber max_sequence(0);
+
+ // Recover from all newer log files than the ones named in the
+ // descriptor (new log files may have been added by the previous
+ // incarnation without registering them in the descriptor).
+ //
+ // Note that PrevLogNumber() is no longer used, but we pay
+ // attention to it in case we are recovering a database
+ // produced by an older version of leveldb.
+ const uint64_t min_log = versions_->LogNumber();
+ const uint64_t prev_log = versions_->PrevLogNumber();
+ std::vector<std::string> filenames;
+ s = env_->GetChildren(dbname_, &filenames);
+ if (!s.ok()) {
+ return s;
+ }
+ std::set<uint64_t> expected;
+ versions_->AddLiveFiles(&expected);
+ uint64_t number;
+ FileType type;
+ std::vector<uint64_t> logs;
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type)) {
+ expected.erase(number);
+ if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
+ logs.push_back(number);
+ }
+ }
+ if (!expected.empty()) {
+ char buf[50];
+ snprintf(buf, sizeof(buf), "%d missing files; e.g.",
+ static_cast<int>(expected.size()));
+ return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
+ }
+
+ // Recover in the order in which the logs were generated
+ std::sort(logs.begin(), logs.end());
+ for (size_t i = 0; i < logs.size(); i++) {
+ s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
+ &max_sequence);
if (!s.ok()) {
return s;
}
- std::set<uint64_t> expected;
- versions_->AddLiveFiles(&expected);
- uint64_t number;
- FileType type;
- std::vector<uint64_t> logs;
- for (size_t i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &type)) {
- expected.erase(number);
- if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
- logs.push_back(number);
- }
- }
- if (!expected.empty()) {
- char buf[50];
- snprintf(buf, sizeof(buf), "%d missing files; e.g.",
- static_cast<int>(expected.size()));
- return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
- }
-
- // Recover in the order in which the logs were generated
- std::sort(logs.begin(), logs.end());
- for (size_t i = 0; i < logs.size(); i++) {
- s = RecoverLogFile(logs[i], edit, &max_sequence);
- // The previous incarnation may not have written any MANIFEST
- // records after allocating this log number. So we manually
- // update the file number allocation counter in VersionSet.
- versions_->MarkFileNumberUsed(logs[i]);
- }
+ // The previous incarnation may not have written any MANIFEST
+ // records after allocating this log number. So we manually
+ // update the file number allocation counter in VersionSet.
+ versions_->MarkFileNumberUsed(logs[i]);
+ }
- if (s.ok()) {
- if (versions_->LastSequence() < max_sequence) {
- versions_->SetLastSequence(max_sequence);
- }
- }
+ if (versions_->LastSequence() < max_sequence) {
+ versions_->SetLastSequence(max_sequence);
}
- return s;
+ return Status::OK();
}
-Status DBImpl::RecoverLogFile(uint64_t log_number,
- VersionEdit* edit,
+Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
+ bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
@@ -405,6 +408,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch;
Slice record;
WriteBatch batch;
+ int compactions = 0;
MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
@@ -432,25 +436,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
+ compactions++;
+ *save_manifest = true;
status = WriteLevel0Table(mem, edit, NULL);
+ mem->Unref();
+ mem = NULL;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
- mem->Unref();
- mem = NULL;
}
}
- if (status.ok() && mem != NULL) {
- status = WriteLevel0Table(mem, edit, NULL);
- // Reflect errors immediately so that conditions like full
- // file-systems cause the DB::Open() to fail.
+ delete file;
+
+ // See if we should keep reusing the last log file.
+ if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
+ assert(logfile_ == NULL);
+ assert(log_ == NULL);
+ assert(mem_ == NULL);
+ uint64_t lfile_size;
+ if (env_->GetFileSize(fname, &lfile_size).ok() &&
+ env_->NewAppendableFile(fname, &logfile_).ok()) {
+ Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
+ log_ = new log::Writer(logfile_, lfile_size);
+ logfile_number_ = log_number;
+ if (mem != NULL) {
+ mem_ = mem;
+ mem = NULL;
+ } else {
+ // mem can be NULL if lognum exists but was empty.
+ mem_ = new MemTable(internal_comparator_);
+ mem_->Ref();
+ }
+ }
+ }
+
+ if (mem != NULL) {
+ // mem did not get reused; compact it.
+ if (status.ok()) {
+ *save_manifest = true;
+ status = WriteLevel0Table(mem, edit, NULL);
+ }
+ mem->Unref();
}
- if (mem != NULL) mem->Unref();
- delete file;
return status;
}
@@ -821,8 +852,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
delete iter;
if (s.ok()) {
Log(options_.info_log,
- "Generated table #%llu: %lld keys, %lld bytes",
+ "Generated table #%llu@%d: %lld keys, %lld bytes",
(unsigned long long) output_number,
+ compact->compaction->level(),
(unsigned long long) current_entries,
(unsigned long long) current_bytes);
}
@@ -1395,6 +1427,19 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
} else if (in == "sstables") {
*value = versions_->current()->DebugString();
return true;
+ } else if (in == "approximate-memory-usage") {
+ size_t total_usage = options_.block_cache->TotalCharge();
+ if (mem_) {
+ total_usage += mem_->ApproximateMemoryUsage();
+ }
+ if (imm_) {
+ total_usage += imm_->ApproximateMemoryUsage();
+ }
+ char buf[50];
+ snprintf(buf, sizeof(buf), "%llu",
+ static_cast<unsigned long long>(total_usage));
+ value->append(buf);
+ return true;
}
return false;
@@ -1449,8 +1494,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
- Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
- if (s.ok()) {
+ // Recover handles create_if_missing, error_if_exists
+ bool save_manifest = false;
+ Status s = impl->Recover(&edit, &save_manifest);
+ if (s.ok() && impl->mem_ == NULL) {
+ // Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
@@ -1460,15 +1508,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
- s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
- }
- if (s.ok()) {
- impl->DeleteObsoleteFiles();
- impl->MaybeScheduleCompaction();
+ impl->mem_ = new MemTable(impl->internal_comparator_);
+ impl->mem_->Ref();
}
}
+ if (s.ok() && save_manifest) {
+ edit.SetPrevLogNumber(0); // No older logs needed after recovery.
+ edit.SetLogNumber(impl->logfile_number_);
+ s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
+ }
+ if (s.ok()) {
+ impl->DeleteObsoleteFiles();
+ impl->MaybeScheduleCompaction();
+ }
impl->mutex_.Unlock();
if (s.ok()) {
+ assert(impl->mem_ != NULL);
*dbptr = impl;
} else {
delete impl;