diff options
author | Pieter Wuille <pieter.wuille@gmail.com> | 2013-12-12 22:08:18 +0100 |
---|---|---|
committer | Pieter Wuille <pieter.wuille@gmail.com> | 2013-12-12 22:08:18 +0100 |
commit | ed873a301eeff8c021ba57c78fb702fd2b6f0452 (patch) | |
tree | 3fb5bcb91b8ceab464a50e63b6fce407eccb17e2 | |
parent | fb34be6d5905d36a2d9a392d87aa73bdb9470a1b (diff) | |
parent | 55c68902946c8c9bacea748c159be456a7c9c180 (diff) |
Merge src/leveldb changes for LevelDB 1.15
31 files changed, 448 insertions, 365 deletions
diff --git a/src/leveldb/AUTHORS b/src/leveldb/AUTHORS index fc40194ab9..2439d7a452 100644 --- a/src/leveldb/AUTHORS +++ b/src/leveldb/AUTHORS @@ -9,3 +9,4 @@ Sanjay Ghemawat <sanjay@google.com> # Partial list of contributors: Kevin Regan <kevin.d.regan@gmail.com> +Johan Bilien <jobi@litl.com> diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile index 20c9c4f287..344ff2972a 100644 --- a/src/leveldb/Makefile +++ b/src/leveldb/Makefile @@ -44,6 +44,7 @@ TESTS = \ filename_test \ filter_block_test \ issue178_test \ + issue200_test \ log_test \ memenv_test \ skiplist_test \ @@ -71,7 +72,7 @@ SHARED = $(SHARED1) else # Update db.h if you change these. SHARED_MAJOR = 1 -SHARED_MINOR = 13 +SHARED_MINOR = 15 SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) @@ -154,6 +155,9 @@ filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) +issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) $(LDFLAGS) issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) + log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) @@ -191,14 +195,14 @@ IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBu mkdir -p ios-x86/$(dir $@) $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ mkdir -p ios-arm/$(dir $@) - $(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ + xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ lipo ios-x86/$@ ios-arm/$@ -create -output $@ .c.o: mkdir -p ios-x86/$(dir $@) $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ mkdir -p ios-arm/$(dir $@) - $(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ + xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ lipo ios-x86/$@ ios-arm/$@ -create -output $@ else diff --git a/src/leveldb/build_detect_platform b/src/leveldb/build_detect_platform index bdfd64172c..85b1ce0224 100755 --- a/src/leveldb/build_detect_platform +++ b/src/leveldb/build_detect_platform @@ -137,6 +137,16 @@ case "$TARGET_OS" in # man ld: +h internal_name PLATFORM_SHARED_LDFLAGS="-shared -Wl,+h -Wl," ;; + IOS) + PLATFORM=IOS + COMMON_FLAGS="$MEMCMP_FLAG -DOS_MACOSX" + [ -z "$INSTALL_PATH" ] && INSTALL_PATH=`pwd` + PORT_FILE=port/port_posix.cc + PLATFORM_SHARED_EXT= + PLATFORM_SHARED_LDFLAGS= + PLATFORM_SHARED_CFLAGS= + PLATFORM_SHARED_VERSIONED= + ;; OS_WINDOWS_CROSSCOMPILE | NATIVE_WINDOWS) PLATFORM=OS_WINDOWS COMMON_FLAGS="-fno-builtin-memcmp -D_REENTRANT -DOS_WINDOWS -DLEVELDB_PLATFORM_WINDOWS -DWINVER=0x0500 -D__USE_MINGW_ANSI_STDIO=1" diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc index b37ffdfe64..96afc68913 100644 --- a/src/leveldb/db/corruption_test.cc +++ b/src/leveldb/db/corruption_test.cc @@ -75,7 +75,13 @@ class CorruptionTest { Slice key = Key(i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); - ASSERT_OK(db_->Write(WriteOptions(), &batch)); + WriteOptions options; + // Corrupt() doesn't work without this sync on windows; stat reports 0 for + // the file size. + if (i == n - 1) { + options.sync = true; + } + ASSERT_OK(db_->Write(options, &batch)); } } @@ -125,7 +131,7 @@ class CorruptionTest { FileType type; std::string fname; int picked_number = -1; - for (int i = 0; i < filenames.size(); i++) { + for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == filetype && int(number) > picked_number) { // Pick latest file @@ -238,6 +244,22 @@ TEST(CorruptionTest, TableFile) { Check(90, 99); } +TEST(CorruptionTest, TableFileRepair) { + options_.block_size = 2 * kValueSize; // Limit scope of corruption + options_.paranoid_checks = true; + Reopen(); + Build(100); + DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); + dbi->TEST_CompactMemTable(); + dbi->TEST_CompactRange(0, NULL, NULL); + dbi->TEST_CompactRange(1, NULL, NULL); + + Corrupt(kTableFile, 100, 1); + RepairDB(); + Reopen(); + Check(95, 99); +} + TEST(CorruptionTest, TableFileIndexData) { Build(10000); // Enough to build multiple Tables DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); diff --git a/src/leveldb/db/db_bench.cc b/src/leveldb/db/db_bench.cc index 7abdf87587..fc46d89693 100644 --- a/src/leveldb/db/db_bench.cc +++ b/src/leveldb/db/db_bench.cc @@ -128,7 +128,7 @@ class RandomGenerator { pos_ = 0; } - Slice Generate(int len) { + Slice Generate(size_t len) { if (pos_ + len > data_.size()) { pos_ = 0; assert(len < data_.size()); @@ -139,11 +139,11 @@ class RandomGenerator { }; static Slice TrimSpace(Slice s) { - int start = 0; + size_t start = 0; while (start < s.size() && isspace(s[start])) { start++; } - int limit = s.size(); + size_t limit = s.size(); while (limit > start && isspace(s[limit-1])) { limit--; } @@ -399,7 +399,7 @@ class Benchmark { heap_counter_(0) { std::vector<std::string> files; Env::Default()->GetChildren(FLAGS_db, &files); - for (int i = 0; i < files.size(); i++) { + for (size_t i = 0; i < files.size(); i++) { if (Slice(files[i]).starts_with("heap-")) { Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); } diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index fa1351038b..faf5e7d7ba 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), - manual_compaction_(NULL), - consecutive_compaction_errors_(0) { + manual_compaction_(NULL) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + if (!bg_error_.ok()) { + // After a background error, we don't know whether a new version may + // or may not have been committed, so we cannot safely garbage collect. + return; + } + // Make a set of all of the live files std::set<uint64_t> live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); @@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() { imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); + } else { + RecordBackgroundError(s); } - - return s; } void DBImpl::CompactRange(const Slice* begin, const Slice* end) { @@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } MutexLock l(&mutex_); - while (!manual.done) { - while (manual_compaction_ != NULL) { - bg_cv_.Wait(); - } - manual_compaction_ = &manual; - MaybeScheduleCompaction(); - while (manual_compaction_ == &manual) { + while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { + if (manual_compaction_ == NULL) { // Idle + manual_compaction_ = &manual; + MaybeScheduleCompaction(); + } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } } + if (manual_compaction_ == &manual) { + // Cancel my manual compaction since we aborted early for some reason. + manual_compaction_ = NULL; + } } Status DBImpl::TEST_CompactMemTable() { @@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::RecordBackgroundError(const Status& s) { + mutex_.AssertHeld(); + if (bg_error_.ok()) { + bg_error_ = s; + bg_cv_.SignalAll(); + } +} + void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions + } else if (!bg_error_.ok()) { + // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { @@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); - if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); - if (s.ok()) { - // Success - consecutive_compaction_errors_ = 0; - } else if (shutting_down_.Acquire_Load()) { - // Error most likely due to shutdown; do not wait - } else { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); - mutex_.Unlock(); - ++consecutive_compaction_errors_; - int seconds_to_sleep = 1; - for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) { - seconds_to_sleep *= 2; - } - env_->SleepForMicroseconds(seconds_to_sleep * 1000000); - mutex_.Lock(); - } + if (shutting_down_.Acquire_Load()) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + BackgroundCompaction(); } bg_compaction_scheduled_ = false; @@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { - return CompactMemTable(); + CompactMemTable(); + return; } Compaction* c; @@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() { c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), @@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() { } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); + if (!status.ok()) { + RecordBackgroundError(status); + } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); @@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() { } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (options_.paranoid_checks && bg_error_.ok()) { - bg_error_ = status; - } } if (is_manual) { @@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() { } manual_compaction_ = NULL; } - return status; } void DBImpl::CleanupCompaction(CompactionState* compact) { @@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = InstallCompactionResults(compact); } + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); @@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); + if (!status.ok()) { + sync_error = true; + } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); + if (sync_error) { + // The state of the log file is indeterminate: the log record we + // just added may or may not show up when the DB is re-opened. + // So we force the DB into a mode where all future writes fail. + RecordBackgroundError(status); + } } if (updates == tmp_batch_) tmp_batch_->Clear(); diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index 75fd30abe9..cfc998164a 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -87,8 +87,8 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable() - EXCLUSIVE_LOCKS_REQUIRED(mutex_); + // Errors are recorded in bg_error_. + void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status RecoverLogFile(uint64_t log_number, VersionEdit* edit, @@ -102,10 +102,12 @@ class DBImpl : public DB { EXCLUSIVE_LOCKS_REQUIRED(mutex_); WriteBatch* BuildBatchGroup(Writer** last_writer); + void RecordBackgroundError(const Status& s); + void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -170,7 +172,6 @@ class DBImpl : public DB { // Have we encountered a background error in paranoid mode? Status bg_error_; - int consecutive_compaction_errors_; // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". diff --git a/src/leveldb/db/db_iter.cc b/src/leveldb/db/db_iter.cc index 071a54e3f4..3b2035e9e3 100644 --- a/src/leveldb/db/db_iter.cc +++ b/src/leveldb/db/db_iter.cc @@ -161,12 +161,13 @@ void DBIter::Next() { saved_key_.clear(); return; } + // saved_key_ already contains the key to skip past. + } else { + // Store in saved_key_ the current key so we skip it below. + SaveKey(ExtractUserKey(iter_->key()), &saved_key_); } - // Temporarily use saved_key_ as storage for key to skip. - std::string* skip = &saved_key_; - SaveKey(ExtractUserKey(iter_->key()), skip); - FindNextUserEntry(true, skip); + FindNextUserEntry(true, &saved_key_); } void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc index 49aae04dbd..280b01c14b 100644 --- a/src/leveldb/db/db_test.cc +++ b/src/leveldb/db/db_test.cc @@ -57,8 +57,11 @@ void DelayMilliseconds(int millis) { // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: - // sstable Sync() calls are blocked while this pointer is non-NULL. - port::AtomicPointer delay_sstable_sync_; + // sstable/log Sync() calls are blocked while this pointer is non-NULL. + port::AtomicPointer delay_data_sync_; + + // sstable/log Sync() calls return an error. + port::AtomicPointer data_sync_error_; // Simulate no-space errors while this pointer is non-NULL. port::AtomicPointer no_space_; @@ -75,11 +78,9 @@ class SpecialEnv : public EnvWrapper { bool count_random_reads_; AtomicCounter random_read_counter_; - AtomicCounter sleep_counter_; - AtomicCounter sleep_time_counter_; - explicit SpecialEnv(Env* base) : EnvWrapper(base) { - delay_sstable_sync_.Release_Store(NULL); + delay_data_sync_.Release_Store(NULL); + data_sync_error_.Release_Store(NULL); no_space_.Release_Store(NULL); non_writable_.Release_Store(NULL); count_random_reads_ = false; @@ -88,17 +89,17 @@ class SpecialEnv : public EnvWrapper { } Status NewWritableFile(const std::string& f, WritableFile** r) { - class SSTableFile : public WritableFile { + class DataFile : public WritableFile { private: SpecialEnv* env_; WritableFile* base_; public: - SSTableFile(SpecialEnv* env, WritableFile* base) + DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) { } - ~SSTableFile() { delete base_; } + ~DataFile() { delete base_; } Status Append(const Slice& data) { if (env_->no_space_.Acquire_Load() != NULL) { // Drop writes on the floor @@ -110,7 +111,10 @@ class SpecialEnv : public EnvWrapper { Status Close() { return base_->Close(); } Status Flush() { return base_->Flush(); } Status Sync() { - while (env_->delay_sstable_sync_.Acquire_Load() != NULL) { + if (env_->data_sync_error_.Acquire_Load() != NULL) { + return Status::IOError("simulated data sync error"); + } + while (env_->delay_data_sync_.Acquire_Load() != NULL) { DelayMilliseconds(100); } return base_->Sync(); @@ -147,8 +151,9 @@ class SpecialEnv : public EnvWrapper { Status s = target()->NewWritableFile(f, r); if (s.ok()) { - if (strstr(f.c_str(), ".sst") != NULL) { - *r = new SSTableFile(this, *r); + if (strstr(f.c_str(), ".ldb") != NULL || + strstr(f.c_str(), ".log") != NULL) { + *r = new DataFile(this, *r); } else if (strstr(f.c_str(), "MANIFEST") != NULL) { *r = new ManifestFile(this, *r); } @@ -179,12 +184,6 @@ class SpecialEnv : public EnvWrapper { } return s; } - - virtual void SleepForMicroseconds(int micros) { - sleep_counter_.Increment(); - sleep_time_counter_.IncrementBy(micros); - } - }; class DBTest { @@ -322,7 +321,7 @@ class DBTest { } // Check reverse iteration results are the reverse of forward results - int matched = 0; + size_t matched = 0; for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { ASSERT_LT(matched, forward.size()); ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]); @@ -484,6 +483,24 @@ class DBTest { } return false; } + + // Returns number of files renamed. + int RenameLDBToSST() { + std::vector<std::string> filenames; + ASSERT_OK(env_->GetChildren(dbname_, &filenames)); + uint64_t number; + FileType type; + int files_renamed = 0; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { + const std::string from = TableFileName(dbname_, number); + const std::string to = SSTTableFileName(dbname_, number); + ASSERT_OK(env_->RenameFile(from, to)); + files_renamed++; + } + } + return files_renamed; + } }; TEST(DBTest, Empty) { @@ -525,11 +542,11 @@ TEST(DBTest, GetFromImmutableLayer) { ASSERT_OK(Put("foo", "v1")); ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + env_->delay_data_sync_.Release_Store(env_); // Block sync calls Put("k1", std::string(100000, 'x')); // Fill memtable Put("k2", std::string(100000, 'y')); // Trigger compaction ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + env_->delay_data_sync_.Release_Store(NULL); // Release sync calls } while (ChangeOptions()); } @@ -1516,41 +1533,13 @@ TEST(DBTest, NoSpace) { Compact("a", "z"); const int num_files = CountFiles(); env_->no_space_.Release_Store(env_); // Force out-of-space errors - env_->sleep_counter_.Reset(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { for (int level = 0; level < config::kNumLevels-1; level++) { dbfull()->TEST_CompactRange(level, NULL, NULL); } } env_->no_space_.Release_Store(NULL); ASSERT_LT(CountFiles(), num_files + 3); - - // Check that compaction attempts slept after errors - ASSERT_GE(env_->sleep_counter_.Read(), 5); -} - -TEST(DBTest, ExponentialBackoff) { - Options options = CurrentOptions(); - options.env = env_; - Reopen(&options); - - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - Compact("a", "z"); - env_->non_writable_.Release_Store(env_); // Force errors for new files - env_->sleep_counter_.Reset(); - env_->sleep_time_counter_.Reset(); - for (int i = 0; i < 5; i++) { - dbfull()->TEST_CompactRange(2, NULL, NULL); - } - env_->non_writable_.Release_Store(NULL); - - // Wait for compaction to finish - DelayMilliseconds(1000); - - ASSERT_GE(env_->sleep_counter_.Read(), 5); - ASSERT_LT(env_->sleep_counter_.Read(), 10); - ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6); } TEST(DBTest, NonWritableFileSystem) { @@ -1573,6 +1562,37 @@ TEST(DBTest, NonWritableFileSystem) { env_->non_writable_.Release_Store(NULL); } +TEST(DBTest, WriteSyncError) { + // Check that log sync errors cause the DB to disallow future writes. + + // (a) Cause log sync calls to fail + Options options = CurrentOptions(); + options.env = env_; + Reopen(&options); + env_->data_sync_error_.Release_Store(env_); + + // (b) Normal write should succeed + WriteOptions w; + ASSERT_OK(db_->Put(w, "k1", "v1")); + ASSERT_EQ("v1", Get("k1")); + + // (c) Do a sync write; should fail + w.sync = true; + ASSERT_TRUE(!db_->Put(w, "k2", "v2").ok()); + ASSERT_EQ("v1", Get("k1")); + ASSERT_EQ("NOT_FOUND", Get("k2")); + + // (d) make sync behave normally + env_->data_sync_error_.Release_Store(NULL); + + // (e) Do a non-sync write; should fail + w.sync = false; + ASSERT_TRUE(!db_->Put(w, "k3", "v3").ok()); + ASSERT_EQ("v1", Get("k1")); + ASSERT_EQ("NOT_FOUND", Get("k2")); + ASSERT_EQ("NOT_FOUND", Get("k3")); +} + TEST(DBTest, ManifestWriteError) { // Test for the following problem: // (a) Compaction produces file F @@ -1632,6 +1652,22 @@ TEST(DBTest, MissingSSTFile) { << s.ToString(); } +TEST(DBTest, StillReadSST) { + ASSERT_OK(Put("foo", "bar")); + ASSERT_EQ("bar", Get("foo")); + + // Dump the memtable to disk. + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("bar", Get("foo")); + Close(); + ASSERT_GT(RenameLDBToSST(), 0); + Options options = CurrentOptions(); + options.paranoid_checks = true; + Status s = TryReopen(&options); + ASSERT_TRUE(s.ok()); + ASSERT_EQ("bar", Get("foo")); +} + TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); @@ -1663,7 +1699,7 @@ TEST(DBTest, BloomFilter) { dbfull()->TEST_CompactMemTable(); // Prevent auto compactions triggered by seeks - env_->delay_sstable_sync_.Release_Store(env_); + env_->delay_data_sync_.Release_Store(env_); // Lookup present keys. Should rarely read from small sstable. env_->random_read_counter_.Reset(); @@ -1684,7 +1720,7 @@ TEST(DBTest, BloomFilter) { fprintf(stderr, "%d missing => %d reads\n", N, reads); ASSERT_LE(reads, 3*N/100); - env_->delay_sstable_sync_.Release_Store(NULL); + env_->delay_data_sync_.Release_Store(NULL); Close(); delete options.block_cache; delete options.filter_policy; @@ -1744,7 +1780,7 @@ static void MTThreadBody(void* arg) { ASSERT_EQ(k, key); ASSERT_GE(w, 0); ASSERT_LT(w, kNumThreads); - ASSERT_LE(c, reinterpret_cast<uintptr_t>( + ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>( t->state->counter[w].Acquire_Load())); } } diff --git a/src/leveldb/db/filename.cc b/src/leveldb/db/filename.cc index 3c4d49f64e..da32946d99 100644 --- a/src/leveldb/db/filename.cc +++ b/src/leveldb/db/filename.cc @@ -31,6 +31,11 @@ std::string LogFileName(const std::string& name, uint64_t number) { std::string TableFileName(const std::string& name, uint64_t number) { assert(number > 0); + return MakeFileName(name, number, "ldb"); +} + +std::string SSTTableFileName(const std::string& name, uint64_t number) { + assert(number > 0); return MakeFileName(name, number, "sst"); } @@ -71,7 +76,7 @@ std::string OldInfoLogFileName(const std::string& dbname) { // dbname/LOG // dbname/LOG.old // dbname/MANIFEST-[0-9]+ -// dbname/[0-9]+.(log|sst) +// dbname/[0-9]+.(log|sst|ldb) bool ParseFileName(const std::string& fname, uint64_t* number, FileType* type) { @@ -106,7 +111,7 @@ bool ParseFileName(const std::string& fname, Slice suffix = rest; if (suffix == Slice(".log")) { *type = kLogFile; - } else if (suffix == Slice(".sst")) { + } else if (suffix == Slice(".sst") || suffix == Slice(".ldb")) { *type = kTableFile; } else if (suffix == Slice(".dbtmp")) { *type = kTempFile; diff --git a/src/leveldb/db/filename.h b/src/leveldb/db/filename.h index d5d09b1146..87a752605d 100644 --- a/src/leveldb/db/filename.h +++ b/src/leveldb/db/filename.h @@ -37,6 +37,11 @@ extern std::string LogFileName(const std::string& dbname, uint64_t number); // "dbname". extern std::string TableFileName(const std::string& dbname, uint64_t number); +// Return the legacy file name for an sstable with the specified number +// in the db named by "dbname". The result will be prefixed with +// "dbname". +extern std::string SSTTableFileName(const std::string& dbname, uint64_t number); + // Return the name of the descriptor file for the db named by // "dbname" and the specified incarnation number. The result will be // prefixed with "dbname". diff --git a/src/leveldb/db/filename_test.cc b/src/leveldb/db/filename_test.cc index 5a26da4728..a32556deaf 100644 --- a/src/leveldb/db/filename_test.cc +++ b/src/leveldb/db/filename_test.cc @@ -27,6 +27,7 @@ TEST(FileNameTest, Parse) { { "100.log", 100, kLogFile }, { "0.log", 0, kLogFile }, { "0.sst", 0, kTableFile }, + { "0.ldb", 0, kTableFile }, { "CURRENT", 0, kCurrentFile }, { "LOCK", 0, kDBLockFile }, { "MANIFEST-2", 2, kDescriptorFile }, diff --git a/src/leveldb/db/repair.cc b/src/leveldb/db/repair.cc index 022d52f3de..96c9b37af1 100644 --- a/src/leveldb/db/repair.cc +++ b/src/leveldb/db/repair.cc @@ -244,60 +244,133 @@ class Repairer { void ExtractMetaData() { std::vector<TableInfo> kept; for (size_t i = 0; i < table_numbers_.size(); i++) { - TableInfo t; - t.meta.number = table_numbers_[i]; - Status status = ScanTable(&t); - if (!status.ok()) { - std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%llu: ignoring %s", - (unsigned long long) table_numbers_[i], - status.ToString().c_str()); - ArchiveFile(fname); - } else { - tables_.push_back(t); - } + ScanTable(table_numbers_[i]); } } - Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(dbname_, t->meta.number); + Iterator* NewTableIterator(const FileMetaData& meta) { + // Same as compaction iterators: if paranoid_checks are on, turn + // on checksum verification. + ReadOptions r; + r.verify_checksums = options_.paranoid_checks; + return table_cache_->NewIterator(r, meta.number, meta.file_size); + } + + void ScanTable(uint64_t number) { + TableInfo t; + t.meta.number = number; + std::string fname = TableFileName(dbname_, number); + Status status = env_->GetFileSize(fname, &t.meta.file_size); + if (!status.ok()) { + // Try alternate file name. + fname = SSTTableFileName(dbname_, number); + Status s2 = env_->GetFileSize(fname, &t.meta.file_size); + if (s2.ok()) { + status = Status::OK(); + } + } + if (!status.ok()) { + ArchiveFile(TableFileName(dbname_, number)); + ArchiveFile(SSTTableFileName(dbname_, number)); + Log(options_.info_log, "Table #%llu: dropped: %s", + (unsigned long long) t.meta.number, + status.ToString().c_str()); + return; + } + + // Extract metadata by scanning through table. int counter = 0; - Status status = env_->GetFileSize(fname, &t->meta.file_size); - if (status.ok()) { - Iterator* iter = table_cache_->NewIterator( - ReadOptions(), t->meta.number, t->meta.file_size); - bool empty = true; - ParsedInternalKey parsed; - t->max_sequence = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - Slice key = iter->key(); - if (!ParseInternalKey(key, &parsed)) { - Log(options_.info_log, "Table #%llu: unparsable key %s", - (unsigned long long) t->meta.number, - EscapeString(key).c_str()); - continue; - } + Iterator* iter = NewTableIterator(t.meta); + bool empty = true; + ParsedInternalKey parsed; + t.max_sequence = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + if (!ParseInternalKey(key, &parsed)) { + Log(options_.info_log, "Table #%llu: unparsable key %s", + (unsigned long long) t.meta.number, + EscapeString(key).c_str()); + continue; + } - counter++; - if (empty) { - empty = false; - t->meta.smallest.DecodeFrom(key); - } - t->meta.largest.DecodeFrom(key); - if (parsed.sequence > t->max_sequence) { - t->max_sequence = parsed.sequence; - } + counter++; + if (empty) { + empty = false; + t.meta.smallest.DecodeFrom(key); } - if (!iter->status().ok()) { - status = iter->status(); + t.meta.largest.DecodeFrom(key); + if (parsed.sequence > t.max_sequence) { + t.max_sequence = parsed.sequence; } - delete iter; } + if (!iter->status().ok()) { + status = iter->status(); + } + delete iter; Log(options_.info_log, "Table #%llu: %d entries %s", - (unsigned long long) t->meta.number, + (unsigned long long) t.meta.number, counter, status.ToString().c_str()); - return status; + + if (status.ok()) { + tables_.push_back(t); + } else { + RepairTable(fname, t); // RepairTable archives input file. + } + } + + void RepairTable(const std::string& src, TableInfo t) { + // We will copy src contents to a new table and then rename the + // new table over the source. + + // Create builder. + std::string copy = TableFileName(dbname_, next_file_number_++); + WritableFile* file; + Status s = env_->NewWritableFile(copy, &file); + if (!s.ok()) { + return; + } + TableBuilder* builder = new TableBuilder(options_, file); + + // Copy data. + Iterator* iter = NewTableIterator(t.meta); + int counter = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + builder->Add(iter->key(), iter->value()); + counter++; + } + delete iter; + + ArchiveFile(src); + if (counter == 0) { + builder->Abandon(); // Nothing to save + } else { + s = builder->Finish(); + if (s.ok()) { + t.meta.file_size = builder->FileSize(); + } + } + delete builder; + builder = NULL; + + if (s.ok()) { + s = file->Close(); + } + delete file; + file = NULL; + + if (counter > 0 && s.ok()) { + std::string orig = TableFileName(dbname_, t.meta.number); + s = env_->RenameFile(copy, orig); + if (s.ok()) { + Log(options_.info_log, "Table #%llu: %d entries repaired", + (unsigned long long) t.meta.number, counter); + tables_.push_back(t); + } + } + if (!s.ok()) { + env_->DeleteFile(copy); + } } Status WriteDescriptor() { diff --git a/src/leveldb/db/table_cache.cc b/src/leveldb/db/table_cache.cc index 497db27076..e3d82cd3ea 100644 --- a/src/leveldb/db/table_cache.cc +++ b/src/leveldb/db/table_cache.cc @@ -54,6 +54,12 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, RandomAccessFile* file = NULL; Table* table = NULL; s = env_->NewRandomAccessFile(fname, &file); + if (!s.ok()) { + std::string old_fname = SSTTableFileName(dbname_, file_number); + if (env_->NewRandomAccessFile(old_fname, &file).ok()) { + s = Status::OK(); + } + } if (s.ok()) { s = Table::Open(*options_, file, file_size, &table); } diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 66d73be71f..517edd3b18 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -876,12 +876,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); - if (ManifestContains(record)) { - Log(options_->info_log, - "MANIFEST contains log record despite error; advancing to new " - "version to prevent mismatch between in-memory and logged state"); - s = Status::OK(); - } } } @@ -889,8 +883,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // new CURRENT file that points to it. if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); - // No need to double-check MANIFEST in case of error since it - // will be discarded below. } mu->Lock(); @@ -1124,31 +1116,6 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { return scratch->buffer; } -// Return true iff the manifest contains the specified record. -bool VersionSet::ManifestContains(const std::string& record) const { - std::string fname = DescriptorFileName(dbname_, manifest_file_number_); - Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); - SequentialFile* file = NULL; - Status s = env_->NewSequentialFile(fname, &file); - if (!s.ok()) { - Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str()); - return false; - } - log::Reader reader(file, NULL, true/*checksum*/, 0); - Slice r; - std::string scratch; - bool result = false; - while (reader.ReadRecord(&r, &scratch)) { - if (r == Slice(record)) { - result = true; - break; - } - } - delete file; - Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0); - return result; -} - uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; for (int level = 0; level < config::kNumLevels; level++) { diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 20de0e2629..8dc14b8e01 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -292,8 +292,6 @@ class VersionSet { void AppendVersion(Version* v); - bool ManifestContains(const std::string& record) const; - Env* const env_; const std::string dbname_; const Options* const options_; diff --git a/src/leveldb/doc/impl.html b/src/leveldb/doc/impl.html index e870795d23..28817fe0da 100644 --- a/src/leveldb/doc/impl.html +++ b/src/leveldb/doc/impl.html @@ -11,7 +11,7 @@ The implementation of leveldb is similar in spirit to the representation of a single -<a href="http://labs.google.com/papers/bigtable.html"> +<a href="http://research.google.com/archive/bigtable.html"> Bigtable tablet (section 5.3)</a>. However the organization of the files that make up the representation is somewhat different and is explained below. diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h index 57c00a5da0..5ffb29d526 100644 --- a/src/leveldb/include/leveldb/db.h +++ b/src/leveldb/include/leveldb/db.h @@ -14,7 +14,7 @@ namespace leveldb { // Update Makefile if you change these static const int kMajorVersion = 1; -static const int kMinorVersion = 13; +static const int kMinorVersion = 15; struct Options; struct ReadOptions; diff --git a/src/leveldb/include/leveldb/env.h b/src/leveldb/include/leveldb/env.h index fa32289f58..b2072d02c1 100644 --- a/src/leveldb/include/leveldb/env.h +++ b/src/leveldb/include/leveldb/env.h @@ -13,9 +13,9 @@ #ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_ #define STORAGE_LEVELDB_INCLUDE_ENV_H_ -#include <cstdarg> #include <string> #include <vector> +#include <stdarg.h> #include <stdint.h> #include "leveldb/status.h" diff --git a/src/leveldb/issues/issue200_test.cc b/src/leveldb/issues/issue200_test.cc new file mode 100644 index 0000000000..1cec79f443 --- /dev/null +++ b/src/leveldb/issues/issue200_test.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Test for issue 200: when iterator switches direction from backward +// to forward, the current key can be yielded unexpectedly if a new +// mutation has been added just before the current key. + +#include "leveldb/db.h" +#include "util/testharness.h" + +namespace leveldb { + +class Issue200 { }; + +TEST(Issue200, Test) { + // Get rid of any state from an old run. + std::string dbpath = test::TmpDir() + "/leveldb_issue200_test"; + DestroyDB(dbpath, Options()); + + DB *db; + Options options; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbpath, &db)); + + WriteOptions write_options; + ASSERT_OK(db->Put(write_options, "1", "b")); + ASSERT_OK(db->Put(write_options, "2", "c")); + ASSERT_OK(db->Put(write_options, "3", "d")); + ASSERT_OK(db->Put(write_options, "4", "e")); + ASSERT_OK(db->Put(write_options, "5", "f")); + + ReadOptions read_options; + Iterator *iter = db->NewIterator(read_options); + + // Add an element that should not be reflected in the iterator. + ASSERT_OK(db->Put(write_options, "25", "cd")); + + iter->Seek("5"); + ASSERT_EQ(iter->key().ToString(), "5"); + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "4"); + iter->Prev(); + ASSERT_EQ(iter->key().ToString(), "3"); + iter->Next(); + ASSERT_EQ(iter->key().ToString(), "4"); + iter->Next(); + ASSERT_EQ(iter->key().ToString(), "5"); + + delete iter; + delete db; + DestroyDB(dbpath, options); +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/src/leveldb/port/atomic_pointer.h b/src/leveldb/port/atomic_pointer.h index e17bf435ea..a9866b2302 100644 --- a/src/leveldb/port/atomic_pointer.h +++ b/src/leveldb/port/atomic_pointer.h @@ -50,6 +50,13 @@ namespace port { // http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx #define LEVELDB_HAVE_MEMORY_BARRIER +// Mac OS +#elif defined(OS_MACOSX) +inline void MemoryBarrier() { + OSMemoryBarrier(); +} +#define LEVELDB_HAVE_MEMORY_BARRIER + // Gcc on x86 #elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__) inline void MemoryBarrier() { @@ -68,13 +75,6 @@ inline void MemoryBarrier() { } #define LEVELDB_HAVE_MEMORY_BARRIER -// Mac OS -#elif defined(OS_MACOSX) -inline void MemoryBarrier() { - OSMemoryBarrier(); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - // ARM Linux #elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__) typedef void (*LinuxKernelMemoryBarrierFunc)(void); diff --git a/src/leveldb/table/filter_block_test.cc b/src/leveldb/table/filter_block_test.cc index 3a2a07cf53..8c4a4741f2 100644 --- a/src/leveldb/table/filter_block_test.cc +++ b/src/leveldb/table/filter_block_test.cc @@ -29,7 +29,7 @@ class TestHashFilter : public FilterPolicy { virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const { uint32_t h = Hash(key.data(), key.size(), 1); - for (int i = 0; i + 4 <= filter.size(); i += 4) { + for (size_t i = 0; i + 4 <= filter.size(); i += 4) { if (h == DecodeFixed32(filter.data() + i)) { return true; } diff --git a/src/leveldb/util/arena.cc b/src/leveldb/util/arena.cc index 9551d6a3a2..9367f71492 100644 --- a/src/leveldb/util/arena.cc +++ b/src/leveldb/util/arena.cc @@ -40,7 +40,7 @@ char* Arena::AllocateFallback(size_t bytes) { } char* Arena::AllocateAligned(size_t bytes) { - const int align = sizeof(void*); // We'll align to pointer size + const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8; assert((align & (align-1)) == 0); // Pointer size should be a power of 2 size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1); size_t slop = (current_mod == 0 ? 0 : align - current_mod); diff --git a/src/leveldb/util/arena.h b/src/leveldb/util/arena.h index 8f7dde226c..73bbf1cb9b 100644 --- a/src/leveldb/util/arena.h +++ b/src/leveldb/util/arena.h @@ -5,9 +5,9 @@ #ifndef STORAGE_LEVELDB_UTIL_ARENA_H_ #define STORAGE_LEVELDB_UTIL_ARENA_H_ -#include <cstddef> #include <vector> #include <assert.h> +#include <stddef.h> #include <stdint.h> namespace leveldb { diff --git a/src/leveldb/util/arena_test.cc b/src/leveldb/util/arena_test.cc index 63d1778034..58e870ec44 100644 --- a/src/leveldb/util/arena_test.cc +++ b/src/leveldb/util/arena_test.cc @@ -40,7 +40,7 @@ TEST(ArenaTest, Simple) { r = arena.Allocate(s); } - for (int b = 0; b < s; b++) { + for (size_t b = 0; b < s; b++) { // Fill the "i"th allocation with a known bit pattern r[b] = i % 256; } @@ -51,10 +51,10 @@ TEST(ArenaTest, Simple) { ASSERT_LE(arena.MemoryUsage(), bytes * 1.10); } } - for (int i = 0; i < allocated.size(); i++) { + for (size_t i = 0; i < allocated.size(); i++) { size_t num_bytes = allocated[i].first; const char* p = allocated[i].second; - for (int b = 0; b < num_bytes; b++) { + for (size_t b = 0; b < num_bytes; b++) { // Check the "i"th allocation for the known bit pattern ASSERT_EQ(int(p[b]) & 0xff, i % 256); } diff --git a/src/leveldb/util/bloom_test.cc b/src/leveldb/util/bloom_test.cc index 0bf8e8d6eb..77fb1b3159 100644 --- a/src/leveldb/util/bloom_test.cc +++ b/src/leveldb/util/bloom_test.cc @@ -126,7 +126,8 @@ TEST(BloomTest, VaryingLengths) { } Build(); - ASSERT_LE(FilterSize(), (length * 10 / 8) + 40) << length; + ASSERT_LE(FilterSize(), static_cast<size_t>((length * 10 / 8) + 40)) + << length; // All added keys must match for (int i = 0; i < length; i++) { diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc index fb5726e335..521541ea61 100644 --- a/src/leveldb/util/coding_test.cc +++ b/src/leveldb/util/coding_test.cc @@ -112,13 +112,13 @@ TEST(Coding, Varint64) { } std::string s; - for (int i = 0; i < values.size(); i++) { + for (size_t i = 0; i < values.size(); i++) { PutVarint64(&s, values[i]); } const char* p = s.data(); const char* limit = p + s.size(); - for (int i = 0; i < values.size(); i++) { + for (size_t i = 0; i < values.size(); i++) { ASSERT_TRUE(p < limit); uint64_t actual; const char* start = p; @@ -143,7 +143,7 @@ TEST(Coding, Varint32Truncation) { std::string s; PutVarint32(&s, large_value); uint32_t result; - for (int len = 0; len < s.size() - 1; len++) { + for (size_t len = 0; len < s.size() - 1; len++) { ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + len, &result) == NULL); } ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + s.size(), &result) != NULL); @@ -162,7 +162,7 @@ TEST(Coding, Varint64Truncation) { std::string s; PutVarint64(&s, large_value); uint64_t result; - for (int len = 0; len < s.size() - 1; len++) { + for (size_t len = 0; len < s.size() - 1; len++) { ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + len, &result) == NULL); } ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + s.size(), &result) != NULL); diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc index 0f5dcfac5a..93eadb1a4f 100644 --- a/src/leveldb/util/env_posix.cc +++ b/src/leveldb/util/env_posix.cc @@ -176,147 +176,43 @@ class PosixMmapReadableFile: public RandomAccessFile { } }; -// We preallocate up to an extra megabyte and use memcpy to append new -// data to the file. This is safe since we either properly close the -// file before reading from it, or for log files, the reading code -// knows enough to skip zero suffixes. -class PosixMmapFile : public WritableFile { +class PosixWritableFile : public WritableFile { private: std::string filename_; - int fd_; - size_t page_size_; - size_t map_size_; // How much extra memory to map at a time - char* base_; // The mapped region - char* limit_; // Limit of the mapped region - char* dst_; // Where to write next (in range [base_,limit_]) - char* last_sync_; // Where have we synced up to - uint64_t file_offset_; // Offset of base_ in file - - // Have we done an munmap of unsynced data? - bool pending_sync_; - - // Roundup x to a multiple of y - static size_t Roundup(size_t x, size_t y) { - return ((x + y - 1) / y) * y; - } - - size_t TruncateToPageBoundary(size_t s) { - s -= (s & (page_size_ - 1)); - assert((s % page_size_) == 0); - return s; - } - - bool UnmapCurrentRegion() { - bool result = true; - if (base_ != NULL) { - if (last_sync_ < limit_) { - // Defer syncing this data until next Sync() call, if any - pending_sync_ = true; - } - if (munmap(base_, limit_ - base_) != 0) { - result = false; - } - file_offset_ += limit_ - base_; - base_ = NULL; - limit_ = NULL; - last_sync_ = NULL; - dst_ = NULL; - - // Increase the amount we map the next time, but capped at 1MB - if (map_size_ < (1<<20)) { - map_size_ *= 2; - } - } - return result; - } - - bool MapNewRegion() { - assert(base_ == NULL); - if (ftruncate(fd_, file_offset_ + map_size_) < 0) { - return false; - } - void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, - fd_, file_offset_); - if (ptr == MAP_FAILED) { - return false; - } - base_ = reinterpret_cast<char*>(ptr); - limit_ = base_ + map_size_; - dst_ = base_; - last_sync_ = base_; - return true; - } + FILE* file_; public: - PosixMmapFile(const std::string& fname, int fd, size_t page_size) - : filename_(fname), - fd_(fd), - page_size_(page_size), - map_size_(Roundup(65536, page_size)), - base_(NULL), - limit_(NULL), - dst_(NULL), - last_sync_(NULL), - file_offset_(0), - pending_sync_(false) { - assert((page_size & (page_size - 1)) == 0); - } - - - ~PosixMmapFile() { - if (fd_ >= 0) { - PosixMmapFile::Close(); + PosixWritableFile(const std::string& fname, FILE* f) + : filename_(fname), file_(f) { } + + ~PosixWritableFile() { + if (file_ != NULL) { + // Ignoring any potential errors + fclose(file_); } } virtual Status Append(const Slice& data) { - const char* src = data.data(); - size_t left = data.size(); - while (left > 0) { - assert(base_ <= dst_); - assert(dst_ <= limit_); - size_t avail = limit_ - dst_; - if (avail == 0) { - if (!UnmapCurrentRegion() || - !MapNewRegion()) { - return IOError(filename_, errno); - } - } - - size_t n = (left <= avail) ? left : avail; - memcpy(dst_, src, n); - dst_ += n; - src += n; - left -= n; + size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_); + if (r != data.size()) { + return IOError(filename_, errno); } return Status::OK(); } virtual Status Close() { - Status s; - size_t unused = limit_ - dst_; - if (!UnmapCurrentRegion()) { - s = IOError(filename_, errno); - } else if (unused > 0) { - // Trim the extra space at the end of the file - if (ftruncate(fd_, file_offset_ - unused) < 0) { - s = IOError(filename_, errno); - } - } - - if (close(fd_) < 0) { - if (s.ok()) { - s = IOError(filename_, errno); - } + Status result; + if (fclose(file_) != 0) { + result = IOError(filename_, errno); } - - fd_ = -1; - base_ = NULL; - limit_ = NULL; - return s; + file_ = NULL; + return result; } virtual Status Flush() { + if (fflush_unlocked(file_) != 0) { + return IOError(filename_, errno); + } return Status::OK(); } @@ -353,26 +249,10 @@ class PosixMmapFile : public WritableFile { if (!s.ok()) { return s; } - - if (pending_sync_) { - // Some unmapped data was not synced - pending_sync_ = false; - if (fdatasync(fd_) < 0) { - s = IOError(filename_, errno); - } + if (fflush_unlocked(file_) != 0 || + fdatasync(fileno(file_)) != 0) { + s = Status::IOError(filename_, strerror(errno)); } - - if (dst_ > last_sync_) { - // Find the beginnings of the pages that contain the first and last - // bytes to be synced. - size_t p1 = TruncateToPageBoundary(last_sync_ - base_); - size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); - last_sync_ = dst_; - if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) { - s = IOError(filename_, errno); - } - } - return s; } }; @@ -463,12 +343,12 @@ class PosixEnv : public Env { virtual Status NewWritableFile(const std::string& fname, WritableFile** result) { Status s; - const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); - if (fd < 0) { + FILE* f = fopen(fname.c_str(), "w"); + if (f == NULL) { *result = NULL; s = IOError(fname, errno); } else { - *result = new PosixMmapFile(fname, fd, page_size_); + *result = new PosixWritableFile(fname, f); } return s; } @@ -631,7 +511,6 @@ class PosixEnv : public Env { return NULL; } - size_t page_size_; pthread_mutex_t mu_; pthread_cond_t bgsignal_; pthread_t bgthread_; @@ -646,8 +525,7 @@ class PosixEnv : public Env { MmapLimiter mmap_limit_; }; -PosixEnv::PosixEnv() : page_size_(getpagesize()), - started_bgthread_(false) { +PosixEnv::PosixEnv() : started_bgthread_(false) { PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); } diff --git a/src/leveldb/util/testharness.cc b/src/leveldb/util/testharness.cc index eb1bdd554a..402fab34d7 100644 --- a/src/leveldb/util/testharness.cc +++ b/src/leveldb/util/testharness.cc @@ -38,7 +38,7 @@ int RunAllTests() { int num = 0; if (tests != NULL) { - for (int i = 0; i < tests->size(); i++) { + for (size_t i = 0; i < tests->size(); i++) { const Test& t = (*tests)[i]; if (matcher != NULL) { std::string name = t.base; diff --git a/src/leveldb/util/testutil.cc b/src/leveldb/util/testutil.cc index 538d09516d..bee56bf75f 100644 --- a/src/leveldb/util/testutil.cc +++ b/src/leveldb/util/testutil.cc @@ -32,7 +32,7 @@ std::string RandomKey(Random* rnd, int len) { extern Slice CompressibleString(Random* rnd, double compressed_fraction, - int len, std::string* dst) { + size_t len, std::string* dst) { int raw = static_cast<int>(len * compressed_fraction); if (raw < 1) raw = 1; std::string raw_data; diff --git a/src/leveldb/util/testutil.h b/src/leveldb/util/testutil.h index 824e655bd2..adad3fc1ea 100644 --- a/src/leveldb/util/testutil.h +++ b/src/leveldb/util/testutil.h @@ -24,7 +24,7 @@ extern std::string RandomKey(Random* rnd, int len); // "N*compressed_fraction" bytes and return a Slice that references // the generated data. extern Slice CompressibleString(Random* rnd, double compressed_fraction, - int len, std::string* dst); + size_t len, std::string* dst); // A wrapper that allows injection of errors. class ErrorEnv : public EnvWrapper { |