diff options
Diffstat (limited to 'src/leveldb')
28 files changed, 661 insertions, 126 deletions
diff --git a/src/leveldb/.gitignore b/src/leveldb/.gitignore index 55ba072e0e..71d87a4eeb 100644 --- a/src/leveldb/.gitignore +++ b/src/leveldb/.gitignore @@ -6,6 +6,7 @@ build_config.mk *.so.* *_test db_bench +leveldbutil Release Debug Benchmark diff --git a/src/leveldb/AUTHORS b/src/leveldb/AUTHORS index 27a9407e52..fc40194ab9 100644 --- a/src/leveldb/AUTHORS +++ b/src/leveldb/AUTHORS @@ -6,3 +6,6 @@ Google Inc. # Initial version authors: Jeffrey Dean <jeff@google.com> Sanjay Ghemawat <sanjay@google.com> + +# Partial list of contributors: +Kevin Regan <kevin.d.regan@gmail.com> diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile index 42c4952fec..20c9c4f287 100644 --- a/src/leveldb/Makefile +++ b/src/leveldb/Makefile @@ -12,7 +12,7 @@ OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode) #----------------------------------------------- # detect what platform we're building on -$(shell CC=$(CC) CXX=$(CXX) TARGET_OS=$(TARGET_OS) \ +$(shell CC="$(CC)" CXX="$(CXX)" TARGET_OS="$(TARGET_OS)" \ ./build_detect_platform build_config.mk ./) # this file is generated by the previous line to set build flags and sources include build_config.mk @@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL) TESTS = \ arena_test \ + autocompact_test \ bloom_test \ c_test \ cache_test \ @@ -42,6 +43,7 @@ TESTS = \ env_test \ filename_test \ filter_block_test \ + issue178_test \ log_test \ memenv_test \ skiplist_test \ @@ -69,7 +71,7 @@ SHARED = $(SHARED1) else # Update db.h if you change these. SHARED_MAJOR = 1 -SHARED_MINOR = 9 +SHARED_MINOR = 13 SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) @@ -113,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS) arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) +autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) + bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) @@ -146,6 +151,9 @@ filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) +issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) + log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) diff --git a/src/leveldb/build_detect_platform b/src/leveldb/build_detect_platform index 609cb51224..bdfd64172c 100755 --- a/src/leveldb/build_detect_platform +++ b/src/leveldb/build_detect_platform @@ -44,6 +44,10 @@ if test -z "$CXX"; then CXX=g++ fi +if test -z "$TMPDIR"; then + TMPDIR=/tmp +fi + # Detect OS if test -z "$TARGET_OS"; then TARGET_OS=`uname -s` @@ -94,6 +98,12 @@ case "$TARGET_OS" in PLATFORM_LIBS="-lpthread" PORT_FILE=port/port_posix.cc ;; + GNU/kFreeBSD) + PLATFORM=OS_KFREEBSD + COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_KFREEBSD" + PLATFORM_LIBS="-lpthread" + PORT_FILE=port/port_posix.cc + ;; NetBSD) PLATFORM=OS_NETBSD COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_NETBSD" @@ -163,8 +173,10 @@ if [ "$CROSS_COMPILE" = "true" ]; then # Cross-compiling; do not try any compilation tests. true else + CXXOUTPUT="${TMPDIR}/leveldb_build_detect_platform-cxx.$$" + # If -std=c++0x works, use <cstdatomic>. Otherwise use port_posix.h. - $CXX $CXXFLAGS -std=c++0x -x c++ - -o /dev/null 2>/dev/null <<EOF + $CXX $CXXFLAGS -std=c++0x -x c++ - -o $CXXOUTPUT 2>/dev/null <<EOF #include <cstdatomic> int main() {} EOF @@ -176,12 +188,14 @@ EOF fi # Test whether tcmalloc is available - $CXX $CXXFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF + $CXX $CXXFLAGS -x c++ - -o $CXXOUTPUT -ltcmalloc 2>/dev/null <<EOF int main() {} EOF if [ "$?" = 0 ]; then PLATFORM_LIBS="$PLATFORM_LIBS -ltcmalloc" fi + + rm -f $CXXOUTPUT 2>/dev/null fi PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" diff --git a/src/leveldb/db/autocompact_test.cc b/src/leveldb/db/autocompact_test.cc new file mode 100644 index 0000000000..d20a2362c3 --- /dev/null +++ b/src/leveldb/db/autocompact_test.cc @@ -0,0 +1,118 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/db.h" +#include "db/db_impl.h" +#include "leveldb/cache.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +class AutoCompactTest { + public: + std::string dbname_; + Cache* tiny_cache_; + Options options_; + DB* db_; + + AutoCompactTest() { + dbname_ = test::TmpDir() + "/autocompact_test"; + tiny_cache_ = NewLRUCache(100); + options_.block_cache = tiny_cache_; + DestroyDB(dbname_, options_); + options_.create_if_missing = true; + options_.compression = kNoCompression; + ASSERT_OK(DB::Open(options_, dbname_, &db_)); + } + + ~AutoCompactTest() { + delete db_; + DestroyDB(dbname_, Options()); + delete tiny_cache_; + } + + std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); + } + + uint64_t Size(const Slice& start, const Slice& limit) { + Range r(start, limit); + uint64_t size; + db_->GetApproximateSizes(&r, 1, &size); + return size; + } + + void DoReads(int n); +}; + +static const int kValueSize = 200 * 1024; +static const int kTotalSize = 100 * 1024 * 1024; +static const int kCount = kTotalSize / kValueSize; + +// Read through the first n keys repeatedly and check that they get +// compacted (verified by checking the size of the key space). +void AutoCompactTest::DoReads(int n) { + std::string value(kValueSize, 'x'); + DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); + + // Fill database + for (int i = 0; i < kCount; i++) { + ASSERT_OK(db_->Put(WriteOptions(), Key(i), value)); + } + ASSERT_OK(dbi->TEST_CompactMemTable()); + + // Delete everything + for (int i = 0; i < kCount; i++) { + ASSERT_OK(db_->Delete(WriteOptions(), Key(i))); + } + ASSERT_OK(dbi->TEST_CompactMemTable()); + + // Get initial measurement of the space we will be reading. + const int64_t initial_size = Size(Key(0), Key(n)); + const int64_t initial_other_size = Size(Key(n), Key(kCount)); + + // Read until size drops significantly. + std::string limit_key = Key(n); + for (int read = 0; true; read++) { + ASSERT_LT(read, 100) << "Taking too long to compact"; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); + iter->Valid() && iter->key().ToString() < limit_key; + iter->Next()) { + // Drop data + } + delete iter; + // Wait a little bit to allow any triggered compactions to complete. + Env::Default()->SleepForMicroseconds(1000000); + uint64_t size = Size(Key(0), Key(n)); + fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n", + read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0); + if (size <= initial_size/10) { + break; + } + } + + // Verify that the size of the key space not touched by the reads + // is pretty much unchanged. + const int64_t final_other_size = Size(Key(n), Key(kCount)); + ASSERT_LE(final_other_size, initial_other_size + 1048576); + ASSERT_GE(final_other_size, initial_other_size/5 - 1048576); +} + +TEST(AutoCompactTest, ReadAll) { + DoReads(kCount); +} + +TEST(AutoCompactTest, ReadHalf) { + DoReads(kCount/2); +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc index 31b2d5f416..b37ffdfe64 100644 --- a/src/leveldb/db/corruption_test.cc +++ b/src/leveldb/db/corruption_test.cc @@ -35,6 +35,7 @@ class CorruptionTest { CorruptionTest() { tiny_cache_ = NewLRUCache(100); options_.env = &env_; + options_.block_cache = tiny_cache_; dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, options_); @@ -50,17 +51,14 @@ class CorruptionTest { delete tiny_cache_; } - Status TryReopen(Options* options = NULL) { + Status TryReopen() { delete db_; db_ = NULL; - Options opt = (options ? *options : options_); - opt.env = &env_; - opt.block_cache = tiny_cache_; - return DB::Open(opt, dbname_, &db_); + return DB::Open(options_, dbname_, &db_); } - void Reopen(Options* options = NULL) { - ASSERT_OK(TryReopen(options)); + void Reopen() { + ASSERT_OK(TryReopen()); } void RepairDB() { @@ -92,6 +90,10 @@ class CorruptionTest { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { uint64_t key; Slice in(iter->key()); + if (in == "" || in == "~") { + // Ignore boundary keys. + continue; + } if (!ConsumeDecimalNumber(&in, &key) || !in.empty() || key < next_expected) { @@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) { dbi->TEST_CompactRange(1, NULL, NULL); Corrupt(kTableFile, 100, 1); - Check(99, 99); + Check(90, 99); } TEST(CorruptionTest, TableFileIndexData) { @@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) { ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); Corrupt(kTableFile, 100, 1); - Check(9, 9); + Check(5, 9); // Force compactions by writing lots of values Build(10000); @@ -307,32 +309,23 @@ TEST(CorruptionTest, CompactionInputError) { } TEST(CorruptionTest, CompactionInputErrorParanoid) { - Options options; - options.paranoid_checks = true; - options.write_buffer_size = 1048576; - Reopen(&options); + options_.paranoid_checks = true; + options_.write_buffer_size = 512 << 10; + Reopen(); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - // Fill levels >= 1 so memtable compaction outputs to level 1 - for (int level = 1; level < config::kNumLevels; level++) { - dbi->Put(WriteOptions(), "", "begin"); - dbi->Put(WriteOptions(), "~", "end"); + // Make multiple inputs so we need to compact. + for (int i = 0; i < 2; i++) { + Build(10); dbi->TEST_CompactMemTable(); + Corrupt(kTableFile, 100, 1); + env_.SleepForMicroseconds(100000); } + dbi->CompactRange(NULL, NULL); - Build(10); - dbi->TEST_CompactMemTable(); - ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); - - Corrupt(kTableFile, 100, 1); - Check(9, 9); - - // Write must eventually fail because of corrupted table - Status s; + // Write must fail because of corrupted table std::string tmp1, tmp2; - for (int i = 0; i < 10000 && s.ok(); i++) { - s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2)); - } + Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2)); ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db"; } diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index c9de169f29..fa1351038b 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -35,6 +35,8 @@ namespace leveldb { +const int kNumNonTableCacheFiles = 10; + // Information kept for every waiting writer struct DBImpl::Writer { Status status; @@ -92,9 +94,9 @@ Options SanitizeOptions(const std::string& dbname, Options result = src; result.comparator = icmp; result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; - ClipToRange(&result.max_open_files, 20, 50000); - ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.block_size, 1<<10, 4<<20); + ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); + ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); + ClipToRange(&result.block_size, 1<<10, 4<<20); if (result.info_log == NULL) { // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist @@ -111,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname, return result; } -DBImpl::DBImpl(const Options& options, const std::string& dbname) - : env_(options.env), - internal_comparator_(options.comparator), - internal_filter_policy_(options.filter_policy), - options_(SanitizeOptions( - dbname, &internal_comparator_, &internal_filter_policy_, options)), - owns_info_log_(options_.info_log != options.info_log), - owns_cache_(options_.block_cache != options.block_cache), +DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) + : env_(raw_options.env), + internal_comparator_(raw_options.comparator), + internal_filter_policy_(raw_options.filter_policy), + options_(SanitizeOptions(dbname, &internal_comparator_, + &internal_filter_policy_, raw_options)), + owns_info_log_(options_.info_log != raw_options.info_log), + owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), db_lock_(NULL), shutting_down_(NULL), @@ -128,14 +130,16 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_(NULL), logfile_number_(0), log_(NULL), + seed_(0), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), - manual_compaction_(NULL) { + manual_compaction_(NULL), + consecutive_compaction_errors_(0) { mem_->Ref(); has_imm_.Release_Store(NULL); // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options.max_open_files - 10; + const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; table_cache_ = new TableCache(dbname_, &options_, table_cache_size); versions_ = new VersionSet(dbname_, &options_, table_cache_, @@ -310,16 +314,24 @@ Status DBImpl::Recover(VersionEdit* edit) { if (!s.ok()) { return s; } + std::set<uint64_t> expected; + versions_->AddLiveFiles(&expected); uint64_t number; FileType type; std::vector<uint64_t> logs; for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type) - && type == kLogFile - && ((number >= min_log) || (number == prev_log))) { - logs.push_back(number); + if (ParseFileName(filenames[i], &number, &type)) { + expected.erase(number); + if (type == kLogFile && ((number >= min_log) || (number == prev_log))) + logs.push_back(number); } } + if (!expected.empty()) { + char buf[50]; + snprintf(buf, sizeof(buf), "%d missing files; e.g.", + static_cast<int>(expected.size())); + return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); + } // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); @@ -611,6 +623,7 @@ void DBImpl::BackgroundCall() { Status s = BackgroundCompaction(); if (s.ok()) { // Success + consecutive_compaction_errors_ = 0; } else if (shutting_down_.Acquire_Load()) { // Error most likely due to shutdown; do not wait } else { @@ -622,7 +635,12 @@ void DBImpl::BackgroundCall() { Log(options_.info_log, "Waiting after background compaction error: %s", s.ToString().c_str()); mutex_.Unlock(); - env_->SleepForMicroseconds(1000000); + ++consecutive_compaction_errors_; + int seconds_to_sleep = 1; + for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) { + seconds_to_sleep *= 2; + } + env_->SleepForMicroseconds(seconds_to_sleep * 1000000); mutex_.Lock(); } } @@ -1010,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } // namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, - SequenceNumber* latest_snapshot) { + SequenceNumber* latest_snapshot, + uint32_t* seed) { IterState* cleanup = new IterState; mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); @@ -1034,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, cleanup->version = versions_->current(); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); + *seed = ++seed_; mutex_.Unlock(); return internal_iter; } Iterator* DBImpl::TEST_NewInternalIterator() { SequenceNumber ignored; - return NewInternalIterator(ReadOptions(), &ignored); + uint32_t ignored_seed; + return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { @@ -1097,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; - Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); + uint32_t seed; + Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + this, user_comparator(), iter, (options.snapshot != NULL ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ - : latest_snapshot)); + : latest_snapshot), + seed); +} + +void DBImpl::RecordReadSample(Slice key) { + MutexLock l(&mutex_); + if (versions_->current()->RecordReadSample(key)) { + MaybeScheduleCompaction(); + } } const Snapshot* DBImpl::GetSnapshot() { @@ -1268,10 +1298,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else if (imm_ != NULL) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. + Log(options_.info_log, "Current memtable full; waiting...\n"); bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. - Log(options_.info_log, "waiting...\n"); + Log(options_.info_log, "Too many L0 files; waiting...\n"); bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index bd29dd8055..75fd30abe9 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -59,13 +59,19 @@ class DBImpl : public DB { // file at a level >= 1. int64_t TEST_MaxNextLevelOverlappingBytes(); + // Record a sample of bytes read at the specified internal key. + // Samples are taken approximately once every config::kReadBytesPeriod + // bytes. + void RecordReadSample(Slice key); + private: friend class DB; struct CompactionState; struct Writer; Iterator* NewInternalIterator(const ReadOptions&, - SequenceNumber* latest_snapshot); + SequenceNumber* latest_snapshot, + uint32_t* seed); Status NewDB(); @@ -135,6 +141,7 @@ class DBImpl : public DB { WritableFile* logfile_; uint64_t logfile_number_; log::Writer* log_; + uint32_t seed_; // For sampling. // Queue of writers. std::deque<Writer*> writers_; @@ -163,6 +170,7 @@ class DBImpl : public DB { // Have we encountered a background error in paranoid mode? Status bg_error_; + int consecutive_compaction_errors_; // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". diff --git a/src/leveldb/db/db_iter.cc b/src/leveldb/db/db_iter.cc index 87dca2ded4..071a54e3f4 100644 --- a/src/leveldb/db/db_iter.cc +++ b/src/leveldb/db/db_iter.cc @@ -5,12 +5,14 @@ #include "db/db_iter.h" #include "db/filename.h" +#include "db/db_impl.h" #include "db/dbformat.h" #include "leveldb/env.h" #include "leveldb/iterator.h" #include "port/port.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/random.h" namespace leveldb { @@ -46,15 +48,16 @@ class DBIter: public Iterator { kReverse }; - DBIter(const std::string* dbname, Env* env, - const Comparator* cmp, Iterator* iter, SequenceNumber s) - : dbname_(dbname), - env_(env), + DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s, + uint32_t seed) + : db_(db), user_comparator_(cmp), iter_(iter), sequence_(s), direction_(kForward), - valid_(false) { + valid_(false), + rnd_(seed), + bytes_counter_(RandomPeriod()) { } virtual ~DBIter() { delete iter_; @@ -100,8 +103,12 @@ class DBIter: public Iterator { } } - const std::string* const dbname_; - Env* const env_; + // Pick next gap with average value of config::kReadBytesPeriod. + ssize_t RandomPeriod() { + return rnd_.Uniform(2*config::kReadBytesPeriod); + } + + DBImpl* db_; const Comparator* const user_comparator_; Iterator* const iter_; SequenceNumber const sequence_; @@ -112,13 +119,23 @@ class DBIter: public Iterator { Direction direction_; bool valid_; + Random rnd_; + ssize_t bytes_counter_; + // No copying allowed DBIter(const DBIter&); void operator=(const DBIter&); }; inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { - if (!ParseInternalKey(iter_->key(), ikey)) { + Slice k = iter_->key(); + ssize_t n = k.size() + iter_->value().size(); + bytes_counter_ -= n; + while (bytes_counter_ < 0) { + bytes_counter_ += RandomPeriod(); + db_->RecordReadSample(k); + } + if (!ParseInternalKey(k, ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); return false; } else { @@ -288,12 +305,12 @@ void DBIter::SeekToLast() { } // anonymous namespace Iterator* NewDBIterator( - const std::string* dbname, - Env* env, + DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence) { - return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); + SequenceNumber sequence, + uint32_t seed) { + return new DBIter(db, user_key_comparator, internal_iter, sequence, seed); } } // namespace leveldb diff --git a/src/leveldb/db/db_iter.h b/src/leveldb/db/db_iter.h index d9e1b174ab..04927e937b 100644 --- a/src/leveldb/db/db_iter.h +++ b/src/leveldb/db/db_iter.h @@ -11,15 +11,17 @@ namespace leveldb { +class DBImpl; + // Return a new iterator that converts internal keys (yielded by // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. extern Iterator* NewDBIterator( - const std::string* dbname, - Env* env, + DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence); + SequenceNumber sequence, + uint32_t seed); } // namespace leveldb diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc index 684ea3bdbc..49aae04dbd 100644 --- a/src/leveldb/db/db_test.cc +++ b/src/leveldb/db/db_test.cc @@ -33,8 +33,11 @@ class AtomicCounter { public: AtomicCounter() : count_(0) { } void Increment() { + IncrementBy(1); + } + void IncrementBy(int count) { MutexLock l(&mu_); - count_++; + count_ += count; } int Read() { MutexLock l(&mu_); @@ -45,6 +48,10 @@ class AtomicCounter { count_ = 0; } }; + +void DelayMilliseconds(int millis) { + Env::Default()->SleepForMicroseconds(millis * 1000); +} } // Special Env used to delay background operations @@ -69,6 +76,7 @@ class SpecialEnv : public EnvWrapper { AtomicCounter random_read_counter_; AtomicCounter sleep_counter_; + AtomicCounter sleep_time_counter_; explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(NULL); @@ -103,7 +111,7 @@ class SpecialEnv : public EnvWrapper { Status Flush() { return base_->Flush(); } Status Sync() { while (env_->delay_sstable_sync_.Acquire_Load() != NULL) { - env_->SleepForMicroseconds(100000); + DelayMilliseconds(100); } return base_->Sync(); } @@ -174,8 +182,9 @@ class SpecialEnv : public EnvWrapper { virtual void SleepForMicroseconds(int micros) { sleep_counter_.Increment(); - target()->SleepForMicroseconds(micros); + sleep_time_counter_.IncrementBy(micros); } + }; class DBTest { @@ -461,6 +470,20 @@ class DBTest { } return result; } + + bool DeleteAnSSTFile() { + std::vector<std::string> filenames; + ASSERT_OK(env_->GetChildren(dbname_, &filenames)); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { + ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number))); + return true; + } + } + return false; + } }; TEST(DBTest, Empty) { @@ -611,7 +634,7 @@ TEST(DBTest, GetEncountersEmptyLevel) { } // Step 4: Wait for compaction to finish - env_->SleepForMicroseconds(1000000); + DelayMilliseconds(1000); ASSERT_EQ(NumTableFilesAtLevel(0), 0); } while (ChangeOptions()); @@ -1295,7 +1318,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) { Reopen(); Reopen(); ASSERT_EQ("(a->v)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + DelayMilliseconds(1000); // Wait for compaction to finish ASSERT_EQ("(a->v)", Contents()); } @@ -1311,7 +1334,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) { Put("",""); Reopen(); Put("",""); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + DelayMilliseconds(1000); // Wait for compaction to finish Reopen(); Put("d","dv"); Reopen(); @@ -1321,7 +1344,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) { Delete("b"); Reopen(); ASSERT_EQ("(->)(c->cv)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + DelayMilliseconds(1000); // Wait for compaction to finish ASSERT_EQ("(->)(c->cv)", Contents()); } @@ -1506,6 +1529,30 @@ TEST(DBTest, NoSpace) { ASSERT_GE(env_->sleep_counter_.Read(), 5); } +TEST(DBTest, ExponentialBackoff) { + Options options = CurrentOptions(); + options.env = env_; + Reopen(&options); + + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + Compact("a", "z"); + env_->non_writable_.Release_Store(env_); // Force errors for new files + env_->sleep_counter_.Reset(); + env_->sleep_time_counter_.Reset(); + for (int i = 0; i < 5; i++) { + dbfull()->TEST_CompactRange(2, NULL, NULL); + } + env_->non_writable_.Release_Store(NULL); + + // Wait for compaction to finish + DelayMilliseconds(1000); + + ASSERT_GE(env_->sleep_counter_.Read(), 5); + ASSERT_LT(env_->sleep_counter_.Read(), 10); + ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6); +} + TEST(DBTest, NonWritableFileSystem) { Options options = CurrentOptions(); options.write_buffer_size = 1000; @@ -1519,7 +1566,7 @@ TEST(DBTest, NonWritableFileSystem) { fprintf(stderr, "iter %d; errors %d\n", i, errors); if (!Put("foo", big).ok()) { errors++; - env_->SleepForMicroseconds(100000); + DelayMilliseconds(100); } } ASSERT_GT(errors, 0); @@ -1567,6 +1614,24 @@ TEST(DBTest, ManifestWriteError) { } } +TEST(DBTest, MissingSSTFile) { + ASSERT_OK(Put("foo", "bar")); + ASSERT_EQ("bar", Get("foo")); + + // Dump the memtable to disk. + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("bar", Get("foo")); + + Close(); + ASSERT_TRUE(DeleteAnSSTFile()); + Options options = CurrentOptions(); + options.paranoid_checks = true; + Status s = TryReopen(&options); + ASSERT_TRUE(!s.ok()); + ASSERT_TRUE(s.ToString().find("issing") != std::string::npos) + << s.ToString(); +} + TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); @@ -1711,13 +1776,13 @@ TEST(DBTest, MultiThreaded) { } // Let them run for a while - env_->SleepForMicroseconds(kTestSeconds * 1000000); + DelayMilliseconds(kTestSeconds * 1000); // Stop the threads and wait for them to finish mt.stop.Release_Store(&mt); for (int id = 0; id < kNumThreads; id++) { while (mt.thread_done[id].Acquire_Load() == NULL) { - env_->SleepForMicroseconds(100000); + DelayMilliseconds(100); } } } while (ChangeOptions()); diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc index 28e11b398d..20a7ca4462 100644 --- a/src/leveldb/db/dbformat.cc +++ b/src/leveldb/db/dbformat.cc @@ -26,7 +26,7 @@ std::string ParsedInternalKey::DebugString() const { (unsigned long long) sequence, int(type)); std::string result = "'"; - result += user_key.ToString(); + result += EscapeString(user_key.ToString()); result += buf; return result; } diff --git a/src/leveldb/db/dbformat.h b/src/leveldb/db/dbformat.h index f7f64dafb6..5d8a032bd3 100644 --- a/src/leveldb/db/dbformat.h +++ b/src/leveldb/db/dbformat.h @@ -38,6 +38,9 @@ static const int kL0_StopWritesTrigger = 12; // space if the same key space is being repeatedly overwritten. static const int kMaxMemCompactLevel = 2; +// Approximate gap in bytes between samples of data read during iteration. +static const int kReadBytesPeriod = 1048576; + } // namespace config class InternalKey; diff --git a/src/leveldb/db/filename_test.cc b/src/leveldb/db/filename_test.cc index 47353d6c9a..5a26da4728 100644 --- a/src/leveldb/db/filename_test.cc +++ b/src/leveldb/db/filename_test.cc @@ -70,7 +70,7 @@ TEST(FileNameTest, Parse) { for (int i = 0; i < sizeof(errors) / sizeof(errors[0]); i++) { std::string f = errors[i]; ASSERT_TRUE(!ParseFileName(f, &number, &type)) << f; - }; + } } TEST(FileNameTest, Construction) { diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 7d0a5de2b9..66d73be71f 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -289,6 +289,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +void Version::ForEachOverlapping(Slice user_key, Slice internal_key, + void* arg, + bool (*func)(void*, int, FileMetaData*)) { + // TODO(sanjay): Change Version::Get() to use this function. + const Comparator* ucmp = vset_->icmp_.user_comparator(); + + // Search level-0 in order from newest to oldest. + std::vector<FileMetaData*> tmp; + tmp.reserve(files_[0].size()); + for (uint32_t i = 0; i < files_[0].size(); i++) { + FileMetaData* f = files_[0][i]; + if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && + ucmp->Compare(user_key, f->largest.user_key()) <= 0) { + tmp.push_back(f); + } + } + if (!tmp.empty()) { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + for (uint32_t i = 0; i < tmp.size(); i++) { + if (!(*func)(arg, 0, tmp[i])) { + return; + } + } + } + + // Search other levels. + for (int level = 1; level < config::kNumLevels; level++) { + size_t num_files = files_[level].size(); + if (num_files == 0) continue; + + // Binary search to find earliest index whose largest key >= internal_key. + uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key); + if (index < num_files) { + FileMetaData* f = files_[level][index]; + if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) { + // All of "f" is past any data for user_key + } else { + if (!(*func)(arg, level, f)) { + return; + } + } + } + } +} + Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, @@ -401,6 +446,44 @@ bool Version::UpdateStats(const GetStats& stats) { return false; } +bool Version::RecordReadSample(Slice internal_key) { + ParsedInternalKey ikey; + if (!ParseInternalKey(internal_key, &ikey)) { + return false; + } + + struct State { + GetStats stats; // Holds first matching file + int matches; + + static bool Match(void* arg, int level, FileMetaData* f) { + State* state = reinterpret_cast<State*>(arg); + state->matches++; + if (state->matches == 1) { + // Remember first match. + state->stats.seek_file = f; + state->stats.seek_file_level = level; + } + // We can stop iterating once we have a second match. + return state->matches < 2; + } + }; + + State state; + state.matches = 0; + ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match); + + // Must have at least two matches since we want to merge across + // files. But what if we have a single file that contains many + // overwrites and deletions? Should we have another mechanism for + // finding such files? + if (state.matches >= 2) { + // 1MB cost is about 1 seek (see comment in Builder::Apply). + return UpdateStats(state.stats); + } + return false; +} + void Version::Ref() { ++refs_; } @@ -435,10 +518,13 @@ int Version::PickLevelForMemTableOutput( if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } - GetOverlappingInputs(level + 2, &start, &limit, &overlaps); - const int64_t sum = TotalFileSize(overlaps); - if (sum > kMaxGrandParentOverlapBytes) { - break; + if (level + 2 < config::kNumLevels) { + // Check that file does not overlap too many grandparent bytes. + GetOverlappingInputs(level + 2, &start, &limit, &overlaps); + const int64_t sum = TotalFileSize(overlaps); + if (sum > kMaxGrandParentOverlapBytes) { + break; + } } level++; } @@ -452,6 +538,8 @@ void Version::GetOverlappingInputs( const InternalKey* begin, const InternalKey* end, std::vector<FileMetaData*>* inputs) { + assert(level >= 0); + assert(level < config::kNumLevels); inputs->clear(); Slice user_begin, user_end; if (begin != NULL) { @@ -1331,14 +1419,19 @@ Compaction* VersionSet::CompactRange( } // Avoid compacting too much in one shot in case the range is large. - const uint64_t limit = MaxFileSizeForLevel(level); - uint64_t total = 0; - for (size_t i = 0; i < inputs.size(); i++) { - uint64_t s = inputs[i]->file_size; - total += s; - if (total >= limit) { - inputs.resize(i + 1); - break; + // But we cannot do this for level-0 since level-0 files can overlap + // and we must not pick one file and drop another older file if the + // two files overlap. + if (level > 0) { + const uint64_t limit = MaxFileSizeForLevel(level); + uint64_t total = 0; + for (size_t i = 0; i < inputs.size(); i++) { + uint64_t s = inputs[i]->file_size; + total += s; + if (total >= limit) { + inputs.resize(i + 1); + break; + } } } diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 9d084fdb7d..20de0e2629 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -78,6 +78,12 @@ class Version { // REQUIRES: lock is held bool UpdateStats(const GetStats& stats); + // Record a sample of bytes read at the specified internal key. + // Samples are taken approximately once every config::kReadBytesPeriod + // bytes. Returns true if a new compaction may need to be triggered. + // REQUIRES: lock is held + bool RecordReadSample(Slice key); + // Reference count management (so Versions do not disappear out from // under live iterators) void Ref(); @@ -114,6 +120,15 @@ class Version { class LevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const; + // Call func(arg, level, f) for every file that overlaps user_key in + // order from newest to oldest. If an invocation of func returns + // false, makes no more calls. + // + // REQUIRES: user portion of internal_key == user_key. + void ForEachOverlapping(Slice user_key, Slice internal_key, + void* arg, + bool (*func)(void*, int, FileMetaData*)); + VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h index 29d3674479..57c00a5da0 100644 --- a/src/leveldb/include/leveldb/db.h +++ b/src/leveldb/include/leveldb/db.h @@ -14,7 +14,7 @@ namespace leveldb { // Update Makefile if you change these static const int kMajorVersion = 1; -static const int kMinorVersion = 9; +static const int kMinorVersion = 13; struct Options; struct ReadOptions; diff --git a/src/leveldb/issues/issue178_test.cc b/src/leveldb/issues/issue178_test.cc new file mode 100644 index 0000000000..1b1cf8bb28 --- /dev/null +++ b/src/leveldb/issues/issue178_test.cc @@ -0,0 +1,92 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// Test for issue 178: a manual compaction causes deleted data to reappear. +#include <iostream> +#include <sstream> +#include <cstdlib> + +#include "leveldb/db.h" +#include "leveldb/write_batch.h" +#include "util/testharness.h" + +namespace { + +const int kNumKeys = 1100000; + +std::string Key1(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "my_key_%d", i); + return buf; +} + +std::string Key2(int i) { + return Key1(i) + "_xxx"; +} + +class Issue178 { }; + +TEST(Issue178, Test) { + // Get rid of any state from an old run. + std::string dbpath = leveldb::test::TmpDir() + "/leveldb_cbug_test"; + DestroyDB(dbpath, leveldb::Options()); + + // Open database. Disable compression since it affects the creation + // of layers and the code below is trying to test against a very + // specific scenario. + leveldb::DB* db; + leveldb::Options db_options; + db_options.create_if_missing = true; + db_options.compression = leveldb::kNoCompression; + ASSERT_OK(leveldb::DB::Open(db_options, dbpath, &db)); + + // create first key range + leveldb::WriteBatch batch; + for (size_t i = 0; i < kNumKeys; i++) { + batch.Put(Key1(i), "value for range 1 key"); + } + ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch)); + + // create second key range + batch.Clear(); + for (size_t i = 0; i < kNumKeys; i++) { + batch.Put(Key2(i), "value for range 2 key"); + } + ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch)); + + // delete second key range + batch.Clear(); + for (size_t i = 0; i < kNumKeys; i++) { + batch.Delete(Key2(i)); + } + ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch)); + + // compact database + std::string start_key = Key1(0); + std::string end_key = Key1(kNumKeys - 1); + leveldb::Slice least(start_key.data(), start_key.size()); + leveldb::Slice greatest(end_key.data(), end_key.size()); + + // commenting out the line below causes the example to work correctly + db->CompactRange(&least, &greatest); + + // count the keys + leveldb::Iterator* iter = db->NewIterator(leveldb::ReadOptions()); + size_t num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + num_keys++; + } + delete iter; + ASSERT_EQ(kNumKeys, num_keys) << "Bad number of keys"; + + // close database + delete db; + DestroyDB(dbpath, leveldb::Options()); +} + +} // anonymous namespace + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/src/leveldb/port/port_posix.h b/src/leveldb/port/port_posix.h index f2b89bffb9..21c845e211 100644 --- a/src/leveldb/port/port_posix.h +++ b/src/leveldb/port/port_posix.h @@ -62,12 +62,16 @@ #define fflush_unlocked fflush #endif -#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\ +#if defined(OS_FREEBSD) ||\ defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) // Use fsync() on platforms without fdatasync() #define fdatasync fsync #endif +#if defined(OS_MACOSX) +#define fdatasync(fd) fcntl(fd, F_FULLFSYNC, 0) +#endif + #if defined(OS_ANDROID) && __ANDROID_API__ < 9 // fdatasync() was only introduced in API level 9 on Android. Use fsync() // when targetting older platforms. diff --git a/src/leveldb/port/port_win.cc b/src/leveldb/port/port_win.cc index 99c1d8e346..1b0f060a19 100644 --- a/src/leveldb/port/port_win.cc +++ b/src/leveldb/port/port_win.cc @@ -109,12 +109,10 @@ void CondVar::Signal() { void CondVar::SignalAll() { wait_mtx_.Lock(); - for(long i = 0; i < waiting_; ++i) { - ::ReleaseSemaphore(sem1_, 1, NULL); - while(waiting_ > 0) { - --waiting_; - ::WaitForSingleObject(sem2_, INFINITE); - } + ::ReleaseSemaphore(sem1_, waiting_, NULL); + while(waiting_ > 0) { + --waiting_; + ::WaitForSingleObject(sem2_, INFINITE); } wait_mtx_.Unlock(); } diff --git a/src/leveldb/table/block.cc b/src/leveldb/table/block.cc index ab83c1112c..79ea9d9ee5 100644 --- a/src/leveldb/table/block.cc +++ b/src/leveldb/table/block.cc @@ -16,7 +16,7 @@ namespace leveldb { inline uint32_t Block::NumRestarts() const { - assert(size_ >= 2*sizeof(uint32_t)); + assert(size_ >= sizeof(uint32_t)); return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); } @@ -27,11 +27,12 @@ Block::Block(const BlockContents& contents) if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { - restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); - if (restart_offset_ > size_ - sizeof(uint32_t)) { - // The size is too small for NumRestarts() and therefore - // restart_offset_ wrapped around. + size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t); + if (NumRestarts() > max_restarts_allowed) { + // The size is too small for NumRestarts() size_ = 0; + } else { + restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); } } } @@ -253,7 +254,7 @@ class Block::Iter : public Iterator { }; Iterator* Block::NewIterator(const Comparator* cmp) { - if (size_ < 2*sizeof(uint32_t)) { + if (size_ < sizeof(uint32_t)) { return NewErrorIterator(Status::Corruption("bad block contents")); } const uint32_t num_restarts = NumRestarts(); diff --git a/src/leveldb/table/table.cc b/src/leveldb/table/table.cc index dbd6d3a1bf..71c1756e5f 100644 --- a/src/leveldb/table/table.cc +++ b/src/leveldb/table/table.cc @@ -228,7 +228,6 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, !filter->KeyMayMatch(handle.offset(), k)) { // Not found } else { - Slice handle = iiter->value(); Iterator* block_iter = BlockReader(this, options, iiter->value()); block_iter->Seek(k); if (block_iter->Valid()) { diff --git a/src/leveldb/table/table_test.cc b/src/leveldb/table/table_test.cc index 57cea25334..c723bf84cf 100644 --- a/src/leveldb/table/table_test.cc +++ b/src/leveldb/table/table_test.cc @@ -644,6 +644,36 @@ class Harness { Constructor* constructor_; }; +// Test empty table/block. +TEST(Harness, Empty) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 1); + Test(&rnd); + } +} + +// Special test for a block with no restart entries. The C++ leveldb +// code never generates such blocks, but the Java version of leveldb +// seems to. +TEST(Harness, ZeroRestartPointsInBlock) { + char data[sizeof(uint32_t)]; + memset(data, 0, sizeof(data)); + BlockContents contents; + contents.data = Slice(data, sizeof(data)); + contents.cachable = false; + contents.heap_allocated = false; + Block block(contents); + Iterator* iter = block.NewIterator(BytewiseComparator()); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + iter->SeekToLast(); + ASSERT_TRUE(!iter->Valid()); + iter->Seek("foo"); + ASSERT_TRUE(!iter->Valid()); + delete iter; +} + // Test the empty key TEST(Harness, SimpleEmptyKey) { for (int i = 0; i < kNumTestArgs; i++) { diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc index 24f1f63f4f..8b197bc02a 100644 --- a/src/leveldb/util/cache.cc +++ b/src/leveldb/util/cache.cc @@ -116,7 +116,6 @@ class HandleTable { LRUHandle* h = list_[i]; while (h != NULL) { LRUHandle* next = h->next_hash; - Slice key = h->key(); uint32_t hash = h->hash; LRUHandle** ptr = &new_list[hash & (new_length - 1)]; h->next_hash = *ptr; @@ -160,7 +159,6 @@ class LRUCache { // mutex_ protects the following state. port::Mutex mutex_; size_t usage_; - uint64_t last_id_; // Dummy head of LRU list. // lru.prev is newest entry, lru.next is oldest entry. @@ -170,8 +168,7 @@ class LRUCache { }; LRUCache::LRUCache() - : usage_(0), - last_id_(0) { + : usage_(0) { // Make empty circular linked list lru_.next = &lru_; lru_.prev = &lru_; diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc index 2c52b17b60..fb5726e335 100644 --- a/src/leveldb/util/coding_test.cc +++ b/src/leveldb/util/coding_test.cc @@ -109,7 +109,7 @@ TEST(Coding, Varint64) { values.push_back(power); values.push_back(power-1); values.push_back(power+1); - }; + } std::string s; for (int i = 0; i < values.size(); i++) { diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc index db81f56d11..0f5dcfac5a 100644 --- a/src/leveldb/util/env_posix.cc +++ b/src/leveldb/util/env_posix.cc @@ -320,8 +320,39 @@ class PosixMmapFile : public WritableFile { return Status::OK(); } - virtual Status Sync() { + Status SyncDirIfManifest() { + const char* f = filename_.c_str(); + const char* sep = strrchr(f, '/'); + Slice basename; + std::string dir; + if (sep == NULL) { + dir = "."; + basename = f; + } else { + dir = std::string(f, sep - f); + basename = sep + 1; + } Status s; + if (basename.starts_with("MANIFEST")) { + int fd = open(dir.c_str(), O_RDONLY); + if (fd < 0) { + s = IOError(dir, errno); + } else { + if (fsync(fd) < 0) { + s = IOError(dir, errno); + } + close(fd); + } + } + return s; + } + + virtual Status Sync() { + // Ensure new files referred to by the manifest are in the filesystem. + Status s = SyncDirIfManifest(); + if (!s.ok()) { + return s; + } if (pending_sync_) { // Some unmapped data was not synced @@ -386,7 +417,7 @@ class PosixEnv : public Env { PosixEnv(); virtual ~PosixEnv() { fprintf(stderr, "Destroying Env::Default()\n"); - exit(1); + abort(); } virtual Status NewSequentialFile(const std::string& fname, @@ -467,7 +498,7 @@ class PosixEnv : public Env { result = IOError(fname, errno); } return result; - }; + } virtual Status CreateDir(const std::string& name) { Status result; @@ -475,7 +506,7 @@ class PosixEnv : public Env { result = IOError(name, errno); } return result; - }; + } virtual Status DeleteDir(const std::string& name) { Status result; @@ -483,7 +514,7 @@ class PosixEnv : public Env { result = IOError(name, errno); } return result; - }; + } virtual Status GetFileSize(const std::string& fname, uint64_t* size) { Status s; @@ -589,7 +620,7 @@ class PosixEnv : public Env { void PthreadCall(const char* label, int result) { if (result != 0) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - exit(1); + abort(); } } diff --git a/src/leveldb/util/hash.cc b/src/leveldb/util/hash.cc index ba1818082d..07cf022060 100644 --- a/src/leveldb/util/hash.cc +++ b/src/leveldb/util/hash.cc @@ -6,6 +6,13 @@ #include "util/coding.h" #include "util/hash.h" +// The FALLTHROUGH_INTENDED macro can be used to annotate implicit fall-through +// between switch labels. The real definition should be provided externally. +// This one is a fallback version for unsupported compilers. +#ifndef FALLTHROUGH_INTENDED +#define FALLTHROUGH_INTENDED do { } while (0) +#endif + namespace leveldb { uint32_t Hash(const char* data, size_t n, uint32_t seed) { @@ -28,10 +35,10 @@ uint32_t Hash(const char* data, size_t n, uint32_t seed) { switch (limit - data) { case 3: h += data[2] << 16; - // fall through + FALLTHROUGH_INTENDED; case 2: h += data[1] << 8; - // fall through + FALLTHROUGH_INTENDED; case 1: h += data[0]; h *= m; diff --git a/src/leveldb/util/random.h b/src/leveldb/util/random.h index 07538242ea..ddd51b1c7b 100644 --- a/src/leveldb/util/random.h +++ b/src/leveldb/util/random.h @@ -16,7 +16,12 @@ class Random { private: uint32_t seed_; public: - explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { } + explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { + // Avoid bad seeds. + if (seed_ == 0 || seed_ == 2147483647L) { + seed_ = 1; + } + } uint32_t Next() { static const uint32_t M = 2147483647L; // 2^31-1 static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 |