aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb')
-rw-r--r--src/leveldb/.gitignore1
-rw-r--r--src/leveldb/AUTHORS3
-rw-r--r--src/leveldb/Makefile12
-rwxr-xr-xsrc/leveldb/build_detect_platform18
-rw-r--r--src/leveldb/db/autocompact_test.cc118
-rw-r--r--src/leveldb/db/corruption_test.cc51
-rw-r--r--src/leveldb/db/db_impl.cc79
-rw-r--r--src/leveldb/db/db_impl.h10
-rw-r--r--src/leveldb/db/db_iter.cc41
-rw-r--r--src/leveldb/db/db_iter.h8
-rw-r--r--src/leveldb/db/db_test.cc85
-rw-r--r--src/leveldb/db/dbformat.cc2
-rw-r--r--src/leveldb/db/dbformat.h3
-rw-r--r--src/leveldb/db/filename_test.cc2
-rw-r--r--src/leveldb/db/version_set.cc117
-rw-r--r--src/leveldb/db/version_set.h15
-rw-r--r--src/leveldb/include/leveldb/db.h2
-rw-r--r--src/leveldb/issues/issue178_test.cc92
-rw-r--r--src/leveldb/port/port_posix.h6
-rw-r--r--src/leveldb/port/port_win.cc10
-rw-r--r--src/leveldb/table/block.cc13
-rw-r--r--src/leveldb/table/table.cc1
-rw-r--r--src/leveldb/table/table_test.cc30
-rw-r--r--src/leveldb/util/cache.cc5
-rw-r--r--src/leveldb/util/coding_test.cc2
-rw-r--r--src/leveldb/util/env_posix.cc43
-rw-r--r--src/leveldb/util/hash.cc11
-rw-r--r--src/leveldb/util/random.h7
28 files changed, 661 insertions, 126 deletions
diff --git a/src/leveldb/.gitignore b/src/leveldb/.gitignore
index 55ba072e0e..71d87a4eeb 100644
--- a/src/leveldb/.gitignore
+++ b/src/leveldb/.gitignore
@@ -6,6 +6,7 @@ build_config.mk
*.so.*
*_test
db_bench
+leveldbutil
Release
Debug
Benchmark
diff --git a/src/leveldb/AUTHORS b/src/leveldb/AUTHORS
index 27a9407e52..fc40194ab9 100644
--- a/src/leveldb/AUTHORS
+++ b/src/leveldb/AUTHORS
@@ -6,3 +6,6 @@ Google Inc.
# Initial version authors:
Jeffrey Dean <jeff@google.com>
Sanjay Ghemawat <sanjay@google.com>
+
+# Partial list of contributors:
+Kevin Regan <kevin.d.regan@gmail.com>
diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile
index 42c4952fec..20c9c4f287 100644
--- a/src/leveldb/Makefile
+++ b/src/leveldb/Makefile
@@ -12,7 +12,7 @@ OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode)
#-----------------------------------------------
# detect what platform we're building on
-$(shell CC=$(CC) CXX=$(CXX) TARGET_OS=$(TARGET_OS) \
+$(shell CC="$(CC)" CXX="$(CXX)" TARGET_OS="$(TARGET_OS)" \
./build_detect_platform build_config.mk ./)
# this file is generated by the previous line to set build flags and sources
include build_config.mk
@@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TESTS = \
arena_test \
+ autocompact_test \
bloom_test \
c_test \
cache_test \
@@ -42,6 +43,7 @@ TESTS = \
env_test \
filename_test \
filter_block_test \
+ issue178_test \
log_test \
memenv_test \
skiplist_test \
@@ -69,7 +71,7 @@ SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
-SHARED_MINOR = 9
+SHARED_MINOR = 13
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
@@ -113,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
+ $(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
@@ -146,6 +151,9 @@ filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS)
+ $(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
diff --git a/src/leveldb/build_detect_platform b/src/leveldb/build_detect_platform
index 609cb51224..bdfd64172c 100755
--- a/src/leveldb/build_detect_platform
+++ b/src/leveldb/build_detect_platform
@@ -44,6 +44,10 @@ if test -z "$CXX"; then
CXX=g++
fi
+if test -z "$TMPDIR"; then
+ TMPDIR=/tmp
+fi
+
# Detect OS
if test -z "$TARGET_OS"; then
TARGET_OS=`uname -s`
@@ -94,6 +98,12 @@ case "$TARGET_OS" in
PLATFORM_LIBS="-lpthread"
PORT_FILE=port/port_posix.cc
;;
+ GNU/kFreeBSD)
+ PLATFORM=OS_KFREEBSD
+ COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_KFREEBSD"
+ PLATFORM_LIBS="-lpthread"
+ PORT_FILE=port/port_posix.cc
+ ;;
NetBSD)
PLATFORM=OS_NETBSD
COMMON_FLAGS="$MEMCMP_FLAG -D_REENTRANT -DOS_NETBSD"
@@ -163,8 +173,10 @@ if [ "$CROSS_COMPILE" = "true" ]; then
# Cross-compiling; do not try any compilation tests.
true
else
+ CXXOUTPUT="${TMPDIR}/leveldb_build_detect_platform-cxx.$$"
+
# If -std=c++0x works, use <cstdatomic>. Otherwise use port_posix.h.
- $CXX $CXXFLAGS -std=c++0x -x c++ - -o /dev/null 2>/dev/null <<EOF
+ $CXX $CXXFLAGS -std=c++0x -x c++ - -o $CXXOUTPUT 2>/dev/null <<EOF
#include <cstdatomic>
int main() {}
EOF
@@ -176,12 +188,14 @@ EOF
fi
# Test whether tcmalloc is available
- $CXX $CXXFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
+ $CXX $CXXFLAGS -x c++ - -o $CXXOUTPUT -ltcmalloc 2>/dev/null <<EOF
int main() {}
EOF
if [ "$?" = 0 ]; then
PLATFORM_LIBS="$PLATFORM_LIBS -ltcmalloc"
fi
+
+ rm -f $CXXOUTPUT 2>/dev/null
fi
PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS"
diff --git a/src/leveldb/db/autocompact_test.cc b/src/leveldb/db/autocompact_test.cc
new file mode 100644
index 0000000000..d20a2362c3
--- /dev/null
+++ b/src/leveldb/db/autocompact_test.cc
@@ -0,0 +1,118 @@
+// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "leveldb/db.h"
+#include "db/db_impl.h"
+#include "leveldb/cache.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+
+namespace leveldb {
+
+class AutoCompactTest {
+ public:
+ std::string dbname_;
+ Cache* tiny_cache_;
+ Options options_;
+ DB* db_;
+
+ AutoCompactTest() {
+ dbname_ = test::TmpDir() + "/autocompact_test";
+ tiny_cache_ = NewLRUCache(100);
+ options_.block_cache = tiny_cache_;
+ DestroyDB(dbname_, options_);
+ options_.create_if_missing = true;
+ options_.compression = kNoCompression;
+ ASSERT_OK(DB::Open(options_, dbname_, &db_));
+ }
+
+ ~AutoCompactTest() {
+ delete db_;
+ DestroyDB(dbname_, Options());
+ delete tiny_cache_;
+ }
+
+ std::string Key(int i) {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "key%06d", i);
+ return std::string(buf);
+ }
+
+ uint64_t Size(const Slice& start, const Slice& limit) {
+ Range r(start, limit);
+ uint64_t size;
+ db_->GetApproximateSizes(&r, 1, &size);
+ return size;
+ }
+
+ void DoReads(int n);
+};
+
+static const int kValueSize = 200 * 1024;
+static const int kTotalSize = 100 * 1024 * 1024;
+static const int kCount = kTotalSize / kValueSize;
+
+// Read through the first n keys repeatedly and check that they get
+// compacted (verified by checking the size of the key space).
+void AutoCompactTest::DoReads(int n) {
+ std::string value(kValueSize, 'x');
+ DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
+
+ // Fill database
+ for (int i = 0; i < kCount; i++) {
+ ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
+ }
+ ASSERT_OK(dbi->TEST_CompactMemTable());
+
+ // Delete everything
+ for (int i = 0; i < kCount; i++) {
+ ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
+ }
+ ASSERT_OK(dbi->TEST_CompactMemTable());
+
+ // Get initial measurement of the space we will be reading.
+ const int64_t initial_size = Size(Key(0), Key(n));
+ const int64_t initial_other_size = Size(Key(n), Key(kCount));
+
+ // Read until size drops significantly.
+ std::string limit_key = Key(n);
+ for (int read = 0; true; read++) {
+ ASSERT_LT(read, 100) << "Taking too long to compact";
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ for (iter->SeekToFirst();
+ iter->Valid() && iter->key().ToString() < limit_key;
+ iter->Next()) {
+ // Drop data
+ }
+ delete iter;
+ // Wait a little bit to allow any triggered compactions to complete.
+ Env::Default()->SleepForMicroseconds(1000000);
+ uint64_t size = Size(Key(0), Key(n));
+ fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
+ read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
+ if (size <= initial_size/10) {
+ break;
+ }
+ }
+
+ // Verify that the size of the key space not touched by the reads
+ // is pretty much unchanged.
+ const int64_t final_other_size = Size(Key(n), Key(kCount));
+ ASSERT_LE(final_other_size, initial_other_size + 1048576);
+ ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
+}
+
+TEST(AutoCompactTest, ReadAll) {
+ DoReads(kCount);
+}
+
+TEST(AutoCompactTest, ReadHalf) {
+ DoReads(kCount/2);
+}
+
+} // namespace leveldb
+
+int main(int argc, char** argv) {
+ return leveldb::test::RunAllTests();
+}
diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc
index 31b2d5f416..b37ffdfe64 100644
--- a/src/leveldb/db/corruption_test.cc
+++ b/src/leveldb/db/corruption_test.cc
@@ -35,6 +35,7 @@ class CorruptionTest {
CorruptionTest() {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
+ options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_);
@@ -50,17 +51,14 @@ class CorruptionTest {
delete tiny_cache_;
}
- Status TryReopen(Options* options = NULL) {
+ Status TryReopen() {
delete db_;
db_ = NULL;
- Options opt = (options ? *options : options_);
- opt.env = &env_;
- opt.block_cache = tiny_cache_;
- return DB::Open(opt, dbname_, &db_);
+ return DB::Open(options_, dbname_, &db_);
}
- void Reopen(Options* options = NULL) {
- ASSERT_OK(TryReopen(options));
+ void Reopen() {
+ ASSERT_OK(TryReopen());
}
void RepairDB() {
@@ -92,6 +90,10 @@ class CorruptionTest {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
+ if (in == "" || in == "~") {
+ // Ignore boundary keys.
+ continue;
+ }
if (!ConsumeDecimalNumber(&in, &key) ||
!in.empty() ||
key < next_expected) {
@@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
dbi->TEST_CompactRange(1, NULL, NULL);
Corrupt(kTableFile, 100, 1);
- Check(99, 99);
+ Check(90, 99);
}
TEST(CorruptionTest, TableFileIndexData) {
@@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) {
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1);
- Check(9, 9);
+ Check(5, 9);
// Force compactions by writing lots of values
Build(10000);
@@ -307,32 +309,23 @@ TEST(CorruptionTest, CompactionInputError) {
}
TEST(CorruptionTest, CompactionInputErrorParanoid) {
- Options options;
- options.paranoid_checks = true;
- options.write_buffer_size = 1048576;
- Reopen(&options);
+ options_.paranoid_checks = true;
+ options_.write_buffer_size = 512 << 10;
+ Reopen();
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
- // Fill levels >= 1 so memtable compaction outputs to level 1
- for (int level = 1; level < config::kNumLevels; level++) {
- dbi->Put(WriteOptions(), "", "begin");
- dbi->Put(WriteOptions(), "~", "end");
+ // Make multiple inputs so we need to compact.
+ for (int i = 0; i < 2; i++) {
+ Build(10);
dbi->TEST_CompactMemTable();
+ Corrupt(kTableFile, 100, 1);
+ env_.SleepForMicroseconds(100000);
}
+ dbi->CompactRange(NULL, NULL);
- Build(10);
- dbi->TEST_CompactMemTable();
- ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
-
- Corrupt(kTableFile, 100, 1);
- Check(9, 9);
-
- // Write must eventually fail because of corrupted table
- Status s;
+ // Write must fail because of corrupted table
std::string tmp1, tmp2;
- for (int i = 0; i < 10000 && s.ok(); i++) {
- s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
- }
+ Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc
index c9de169f29..fa1351038b 100644
--- a/src/leveldb/db/db_impl.cc
+++ b/src/leveldb/db/db_impl.cc
@@ -35,6 +35,8 @@
namespace leveldb {
+const int kNumNonTableCacheFiles = 10;
+
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
@@ -92,9 +94,9 @@ Options SanitizeOptions(const std::string& dbname,
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
- ClipToRange(&result.max_open_files, 20, 50000);
- ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
- ClipToRange(&result.block_size, 1<<10, 4<<20);
+ ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
+ ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
+ ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
@@ -111,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}
-DBImpl::DBImpl(const Options& options, const std::string& dbname)
- : env_(options.env),
- internal_comparator_(options.comparator),
- internal_filter_policy_(options.filter_policy),
- options_(SanitizeOptions(
- dbname, &internal_comparator_, &internal_filter_policy_, options)),
- owns_info_log_(options_.info_log != options.info_log),
- owns_cache_(options_.block_cache != options.block_cache),
+DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
+ : env_(raw_options.env),
+ internal_comparator_(raw_options.comparator),
+ internal_filter_policy_(raw_options.filter_policy),
+ options_(SanitizeOptions(dbname, &internal_comparator_,
+ &internal_filter_policy_, raw_options)),
+ owns_info_log_(options_.info_log != raw_options.info_log),
+ owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
@@ -128,14 +130,16 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
+ seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
- manual_compaction_(NULL) {
+ manual_compaction_(NULL),
+ consecutive_compaction_errors_(0) {
mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
- const int table_cache_size = options.max_open_files - 10;
+ const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_,
@@ -310,16 +314,24 @@ Status DBImpl::Recover(VersionEdit* edit) {
if (!s.ok()) {
return s;
}
+ std::set<uint64_t> expected;
+ versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &type)
- && type == kLogFile
- && ((number >= min_log) || (number == prev_log))) {
- logs.push_back(number);
+ if (ParseFileName(filenames[i], &number, &type)) {
+ expected.erase(number);
+ if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
+ logs.push_back(number);
}
}
+ if (!expected.empty()) {
+ char buf[50];
+ snprintf(buf, sizeof(buf), "%d missing files; e.g.",
+ static_cast<int>(expected.size()));
+ return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
+ }
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
@@ -611,6 +623,7 @@ void DBImpl::BackgroundCall() {
Status s = BackgroundCompaction();
if (s.ok()) {
// Success
+ consecutive_compaction_errors_ = 0;
} else if (shutting_down_.Acquire_Load()) {
// Error most likely due to shutdown; do not wait
} else {
@@ -622,7 +635,12 @@ void DBImpl::BackgroundCall() {
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
- env_->SleepForMicroseconds(1000000);
+ ++consecutive_compaction_errors_;
+ int seconds_to_sleep = 1;
+ for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
+ seconds_to_sleep *= 2;
+ }
+ env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
}
@@ -1010,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
} // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
- SequenceNumber* latest_snapshot) {
+ SequenceNumber* latest_snapshot,
+ uint32_t* seed) {
IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
@@ -1034,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
+ *seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
- return NewInternalIterator(ReadOptions(), &ignored);
+ uint32_t ignored_seed;
+ return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
@@ -1097,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
- Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
+ uint32_t seed;
+ Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(
- &dbname_, env_, user_comparator(), internal_iter,
+ this, user_comparator(), iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
- : latest_snapshot));
+ : latest_snapshot),
+ seed);
+}
+
+void DBImpl::RecordReadSample(Slice key) {
+ MutexLock l(&mutex_);
+ if (versions_->current()->RecordReadSample(key)) {
+ MaybeScheduleCompaction();
+ }
}
const Snapshot* DBImpl::GetSnapshot() {
@@ -1268,10 +1298,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
+ Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
- Log(options_.info_log, "waiting...\n");
+ Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h
index bd29dd8055..75fd30abe9 100644
--- a/src/leveldb/db/db_impl.h
+++ b/src/leveldb/db/db_impl.h
@@ -59,13 +59,19 @@ class DBImpl : public DB {
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
+ // Record a sample of bytes read at the specified internal key.
+ // Samples are taken approximately once every config::kReadBytesPeriod
+ // bytes.
+ void RecordReadSample(Slice key);
+
private:
friend class DB;
struct CompactionState;
struct Writer;
Iterator* NewInternalIterator(const ReadOptions&,
- SequenceNumber* latest_snapshot);
+ SequenceNumber* latest_snapshot,
+ uint32_t* seed);
Status NewDB();
@@ -135,6 +141,7 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
+ uint32_t seed_; // For sampling.
// Queue of writers.
std::deque<Writer*> writers_;
@@ -163,6 +170,7 @@ class DBImpl : public DB {
// Have we encountered a background error in paranoid mode?
Status bg_error_;
+ int consecutive_compaction_errors_;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
diff --git a/src/leveldb/db/db_iter.cc b/src/leveldb/db/db_iter.cc
index 87dca2ded4..071a54e3f4 100644
--- a/src/leveldb/db/db_iter.cc
+++ b/src/leveldb/db/db_iter.cc
@@ -5,12 +5,14 @@
#include "db/db_iter.h"
#include "db/filename.h"
+#include "db/db_impl.h"
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
+#include "util/random.h"
namespace leveldb {
@@ -46,15 +48,16 @@ class DBIter: public Iterator {
kReverse
};
- DBIter(const std::string* dbname, Env* env,
- const Comparator* cmp, Iterator* iter, SequenceNumber s)
- : dbname_(dbname),
- env_(env),
+ DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
+ uint32_t seed)
+ : db_(db),
user_comparator_(cmp),
iter_(iter),
sequence_(s),
direction_(kForward),
- valid_(false) {
+ valid_(false),
+ rnd_(seed),
+ bytes_counter_(RandomPeriod()) {
}
virtual ~DBIter() {
delete iter_;
@@ -100,8 +103,12 @@ class DBIter: public Iterator {
}
}
- const std::string* const dbname_;
- Env* const env_;
+ // Pick next gap with average value of config::kReadBytesPeriod.
+ ssize_t RandomPeriod() {
+ return rnd_.Uniform(2*config::kReadBytesPeriod);
+ }
+
+ DBImpl* db_;
const Comparator* const user_comparator_;
Iterator* const iter_;
SequenceNumber const sequence_;
@@ -112,13 +119,23 @@ class DBIter: public Iterator {
Direction direction_;
bool valid_;
+ Random rnd_;
+ ssize_t bytes_counter_;
+
// No copying allowed
DBIter(const DBIter&);
void operator=(const DBIter&);
};
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
- if (!ParseInternalKey(iter_->key(), ikey)) {
+ Slice k = iter_->key();
+ ssize_t n = k.size() + iter_->value().size();
+ bytes_counter_ -= n;
+ while (bytes_counter_ < 0) {
+ bytes_counter_ += RandomPeriod();
+ db_->RecordReadSample(k);
+ }
+ if (!ParseInternalKey(k, ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
return false;
} else {
@@ -288,12 +305,12 @@ void DBIter::SeekToLast() {
} // anonymous namespace
Iterator* NewDBIterator(
- const std::string* dbname,
- Env* env,
+ DBImpl* db,
const Comparator* user_key_comparator,
Iterator* internal_iter,
- const SequenceNumber& sequence) {
- return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
+ SequenceNumber sequence,
+ uint32_t seed) {
+ return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
}
} // namespace leveldb
diff --git a/src/leveldb/db/db_iter.h b/src/leveldb/db/db_iter.h
index d9e1b174ab..04927e937b 100644
--- a/src/leveldb/db/db_iter.h
+++ b/src/leveldb/db/db_iter.h
@@ -11,15 +11,17 @@
namespace leveldb {
+class DBImpl;
+
// Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys.
extern Iterator* NewDBIterator(
- const std::string* dbname,
- Env* env,
+ DBImpl* db,
const Comparator* user_key_comparator,
Iterator* internal_iter,
- const SequenceNumber& sequence);
+ SequenceNumber sequence,
+ uint32_t seed);
} // namespace leveldb
diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc
index 684ea3bdbc..49aae04dbd 100644
--- a/src/leveldb/db/db_test.cc
+++ b/src/leveldb/db/db_test.cc
@@ -33,8 +33,11 @@ class AtomicCounter {
public:
AtomicCounter() : count_(0) { }
void Increment() {
+ IncrementBy(1);
+ }
+ void IncrementBy(int count) {
MutexLock l(&mu_);
- count_++;
+ count_ += count;
}
int Read() {
MutexLock l(&mu_);
@@ -45,6 +48,10 @@ class AtomicCounter {
count_ = 0;
}
};
+
+void DelayMilliseconds(int millis) {
+ Env::Default()->SleepForMicroseconds(millis * 1000);
+}
}
// Special Env used to delay background operations
@@ -69,6 +76,7 @@ class SpecialEnv : public EnvWrapper {
AtomicCounter random_read_counter_;
AtomicCounter sleep_counter_;
+ AtomicCounter sleep_time_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
@@ -103,7 +111,7 @@ class SpecialEnv : public EnvWrapper {
Status Flush() { return base_->Flush(); }
Status Sync() {
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
return base_->Sync();
}
@@ -174,8 +182,9 @@ class SpecialEnv : public EnvWrapper {
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
- target()->SleepForMicroseconds(micros);
+ sleep_time_counter_.IncrementBy(micros);
}
+
};
class DBTest {
@@ -461,6 +470,20 @@ class DBTest {
}
return result;
}
+
+ bool DeleteAnSSTFile() {
+ std::vector<std::string> filenames;
+ ASSERT_OK(env_->GetChildren(dbname_, &filenames));
+ uint64_t number;
+ FileType type;
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
+ ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
+ return true;
+ }
+ }
+ return false;
+ }
};
TEST(DBTest, Empty) {
@@ -611,7 +634,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
}
// Step 4: Wait for compaction to finish
- env_->SleepForMicroseconds(1000000);
+ DelayMilliseconds(1000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
} while (ChangeOptions());
@@ -1295,7 +1318,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
Reopen();
Reopen();
ASSERT_EQ("(a->v)", Contents());
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
ASSERT_EQ("(a->v)", Contents());
}
@@ -1311,7 +1334,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
Put("","");
Reopen();
Put("","");
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
Reopen();
Put("d","dv");
Reopen();
@@ -1321,7 +1344,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
Delete("b");
Reopen();
ASSERT_EQ("(->)(c->cv)", Contents());
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
ASSERT_EQ("(->)(c->cv)", Contents());
}
@@ -1506,6 +1529,30 @@ TEST(DBTest, NoSpace) {
ASSERT_GE(env_->sleep_counter_.Read(), 5);
}
+TEST(DBTest, ExponentialBackoff) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ Reopen(&options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+ Compact("a", "z");
+ env_->non_writable_.Release_Store(env_); // Force errors for new files
+ env_->sleep_counter_.Reset();
+ env_->sleep_time_counter_.Reset();
+ for (int i = 0; i < 5; i++) {
+ dbfull()->TEST_CompactRange(2, NULL, NULL);
+ }
+ env_->non_writable_.Release_Store(NULL);
+
+ // Wait for compaction to finish
+ DelayMilliseconds(1000);
+
+ ASSERT_GE(env_->sleep_counter_.Read(), 5);
+ ASSERT_LT(env_->sleep_counter_.Read(), 10);
+ ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
+}
+
TEST(DBTest, NonWritableFileSystem) {
Options options = CurrentOptions();
options.write_buffer_size = 1000;
@@ -1519,7 +1566,7 @@ TEST(DBTest, NonWritableFileSystem) {
fprintf(stderr, "iter %d; errors %d\n", i, errors);
if (!Put("foo", big).ok()) {
errors++;
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
}
ASSERT_GT(errors, 0);
@@ -1567,6 +1614,24 @@ TEST(DBTest, ManifestWriteError) {
}
}
+TEST(DBTest, MissingSSTFile) {
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Dump the memtable to disk.
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("bar", Get("foo"));
+
+ Close();
+ ASSERT_TRUE(DeleteAnSSTFile());
+ Options options = CurrentOptions();
+ options.paranoid_checks = true;
+ Status s = TryReopen(&options);
+ ASSERT_TRUE(!s.ok());
+ ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
+ << s.ToString();
+}
+
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
@@ -1711,13 +1776,13 @@ TEST(DBTest, MultiThreaded) {
}
// Let them run for a while
- env_->SleepForMicroseconds(kTestSeconds * 1000000);
+ DelayMilliseconds(kTestSeconds * 1000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == NULL) {
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
}
} while (ChangeOptions());
diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc
index 28e11b398d..20a7ca4462 100644
--- a/src/leveldb/db/dbformat.cc
+++ b/src/leveldb/db/dbformat.cc
@@ -26,7 +26,7 @@ std::string ParsedInternalKey::DebugString() const {
(unsigned long long) sequence,
int(type));
std::string result = "'";
- result += user_key.ToString();
+ result += EscapeString(user_key.ToString());
result += buf;
return result;
}
diff --git a/src/leveldb/db/dbformat.h b/src/leveldb/db/dbformat.h
index f7f64dafb6..5d8a032bd3 100644
--- a/src/leveldb/db/dbformat.h
+++ b/src/leveldb/db/dbformat.h
@@ -38,6 +38,9 @@ static const int kL0_StopWritesTrigger = 12;
// space if the same key space is being repeatedly overwritten.
static const int kMaxMemCompactLevel = 2;
+// Approximate gap in bytes between samples of data read during iteration.
+static const int kReadBytesPeriod = 1048576;
+
} // namespace config
class InternalKey;
diff --git a/src/leveldb/db/filename_test.cc b/src/leveldb/db/filename_test.cc
index 47353d6c9a..5a26da4728 100644
--- a/src/leveldb/db/filename_test.cc
+++ b/src/leveldb/db/filename_test.cc
@@ -70,7 +70,7 @@ TEST(FileNameTest, Parse) {
for (int i = 0; i < sizeof(errors) / sizeof(errors[0]); i++) {
std::string f = errors[i];
ASSERT_TRUE(!ParseFileName(f, &number, &type)) << f;
- };
+ }
}
TEST(FileNameTest, Construction) {
diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc
index 7d0a5de2b9..66d73be71f 100644
--- a/src/leveldb/db/version_set.cc
+++ b/src/leveldb/db/version_set.cc
@@ -289,6 +289,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
+void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
+ void* arg,
+ bool (*func)(void*, int, FileMetaData*)) {
+ // TODO(sanjay): Change Version::Get() to use this function.
+ const Comparator* ucmp = vset_->icmp_.user_comparator();
+
+ // Search level-0 in order from newest to oldest.
+ std::vector<FileMetaData*> tmp;
+ tmp.reserve(files_[0].size());
+ for (uint32_t i = 0; i < files_[0].size(); i++) {
+ FileMetaData* f = files_[0][i];
+ if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
+ ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
+ tmp.push_back(f);
+ }
+ }
+ if (!tmp.empty()) {
+ std::sort(tmp.begin(), tmp.end(), NewestFirst);
+ for (uint32_t i = 0; i < tmp.size(); i++) {
+ if (!(*func)(arg, 0, tmp[i])) {
+ return;
+ }
+ }
+ }
+
+ // Search other levels.
+ for (int level = 1; level < config::kNumLevels; level++) {
+ size_t num_files = files_[level].size();
+ if (num_files == 0) continue;
+
+ // Binary search to find earliest index whose largest key >= internal_key.
+ uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
+ if (index < num_files) {
+ FileMetaData* f = files_[level][index];
+ if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
+ // All of "f" is past any data for user_key
+ } else {
+ if (!(*func)(arg, level, f)) {
+ return;
+ }
+ }
+ }
+ }
+}
+
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
@@ -401,6 +446,44 @@ bool Version::UpdateStats(const GetStats& stats) {
return false;
}
+bool Version::RecordReadSample(Slice internal_key) {
+ ParsedInternalKey ikey;
+ if (!ParseInternalKey(internal_key, &ikey)) {
+ return false;
+ }
+
+ struct State {
+ GetStats stats; // Holds first matching file
+ int matches;
+
+ static bool Match(void* arg, int level, FileMetaData* f) {
+ State* state = reinterpret_cast<State*>(arg);
+ state->matches++;
+ if (state->matches == 1) {
+ // Remember first match.
+ state->stats.seek_file = f;
+ state->stats.seek_file_level = level;
+ }
+ // We can stop iterating once we have a second match.
+ return state->matches < 2;
+ }
+ };
+
+ State state;
+ state.matches = 0;
+ ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
+
+ // Must have at least two matches since we want to merge across
+ // files. But what if we have a single file that contains many
+ // overwrites and deletions? Should we have another mechanism for
+ // finding such files?
+ if (state.matches >= 2) {
+ // 1MB cost is about 1 seek (see comment in Builder::Apply).
+ return UpdateStats(state.stats);
+ }
+ return false;
+}
+
void Version::Ref() {
++refs_;
}
@@ -435,10 +518,13 @@ int Version::PickLevelForMemTableOutput(
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
- GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
- const int64_t sum = TotalFileSize(overlaps);
- if (sum > kMaxGrandParentOverlapBytes) {
- break;
+ if (level + 2 < config::kNumLevels) {
+ // Check that file does not overlap too many grandparent bytes.
+ GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
+ const int64_t sum = TotalFileSize(overlaps);
+ if (sum > kMaxGrandParentOverlapBytes) {
+ break;
+ }
}
level++;
}
@@ -452,6 +538,8 @@ void Version::GetOverlappingInputs(
const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
+ assert(level >= 0);
+ assert(level < config::kNumLevels);
inputs->clear();
Slice user_begin, user_end;
if (begin != NULL) {
@@ -1331,14 +1419,19 @@ Compaction* VersionSet::CompactRange(
}
// Avoid compacting too much in one shot in case the range is large.
- const uint64_t limit = MaxFileSizeForLevel(level);
- uint64_t total = 0;
- for (size_t i = 0; i < inputs.size(); i++) {
- uint64_t s = inputs[i]->file_size;
- total += s;
- if (total >= limit) {
- inputs.resize(i + 1);
- break;
+ // But we cannot do this for level-0 since level-0 files can overlap
+ // and we must not pick one file and drop another older file if the
+ // two files overlap.
+ if (level > 0) {
+ const uint64_t limit = MaxFileSizeForLevel(level);
+ uint64_t total = 0;
+ for (size_t i = 0; i < inputs.size(); i++) {
+ uint64_t s = inputs[i]->file_size;
+ total += s;
+ if (total >= limit) {
+ inputs.resize(i + 1);
+ break;
+ }
}
}
diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h
index 9d084fdb7d..20de0e2629 100644
--- a/src/leveldb/db/version_set.h
+++ b/src/leveldb/db/version_set.h
@@ -78,6 +78,12 @@ class Version {
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
+ // Record a sample of bytes read at the specified internal key.
+ // Samples are taken approximately once every config::kReadBytesPeriod
+ // bytes. Returns true if a new compaction may need to be triggered.
+ // REQUIRES: lock is held
+ bool RecordReadSample(Slice key);
+
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
@@ -114,6 +120,15 @@ class Version {
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
+ // Call func(arg, level, f) for every file that overlaps user_key in
+ // order from newest to oldest. If an invocation of func returns
+ // false, makes no more calls.
+ //
+ // REQUIRES: user portion of internal_key == user_key.
+ void ForEachOverlapping(Slice user_key, Slice internal_key,
+ void* arg,
+ bool (*func)(void*, int, FileMetaData*));
+
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h
index 29d3674479..57c00a5da0 100644
--- a/src/leveldb/include/leveldb/db.h
+++ b/src/leveldb/include/leveldb/db.h
@@ -14,7 +14,7 @@ namespace leveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
-static const int kMinorVersion = 9;
+static const int kMinorVersion = 13;
struct Options;
struct ReadOptions;
diff --git a/src/leveldb/issues/issue178_test.cc b/src/leveldb/issues/issue178_test.cc
new file mode 100644
index 0000000000..1b1cf8bb28
--- /dev/null
+++ b/src/leveldb/issues/issue178_test.cc
@@ -0,0 +1,92 @@
+// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+// Test for issue 178: a manual compaction causes deleted data to reappear.
+#include <iostream>
+#include <sstream>
+#include <cstdlib>
+
+#include "leveldb/db.h"
+#include "leveldb/write_batch.h"
+#include "util/testharness.h"
+
+namespace {
+
+const int kNumKeys = 1100000;
+
+std::string Key1(int i) {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "my_key_%d", i);
+ return buf;
+}
+
+std::string Key2(int i) {
+ return Key1(i) + "_xxx";
+}
+
+class Issue178 { };
+
+TEST(Issue178, Test) {
+ // Get rid of any state from an old run.
+ std::string dbpath = leveldb::test::TmpDir() + "/leveldb_cbug_test";
+ DestroyDB(dbpath, leveldb::Options());
+
+ // Open database. Disable compression since it affects the creation
+ // of layers and the code below is trying to test against a very
+ // specific scenario.
+ leveldb::DB* db;
+ leveldb::Options db_options;
+ db_options.create_if_missing = true;
+ db_options.compression = leveldb::kNoCompression;
+ ASSERT_OK(leveldb::DB::Open(db_options, dbpath, &db));
+
+ // create first key range
+ leveldb::WriteBatch batch;
+ for (size_t i = 0; i < kNumKeys; i++) {
+ batch.Put(Key1(i), "value for range 1 key");
+ }
+ ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch));
+
+ // create second key range
+ batch.Clear();
+ for (size_t i = 0; i < kNumKeys; i++) {
+ batch.Put(Key2(i), "value for range 2 key");
+ }
+ ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch));
+
+ // delete second key range
+ batch.Clear();
+ for (size_t i = 0; i < kNumKeys; i++) {
+ batch.Delete(Key2(i));
+ }
+ ASSERT_OK(db->Write(leveldb::WriteOptions(), &batch));
+
+ // compact database
+ std::string start_key = Key1(0);
+ std::string end_key = Key1(kNumKeys - 1);
+ leveldb::Slice least(start_key.data(), start_key.size());
+ leveldb::Slice greatest(end_key.data(), end_key.size());
+
+ // commenting out the line below causes the example to work correctly
+ db->CompactRange(&least, &greatest);
+
+ // count the keys
+ leveldb::Iterator* iter = db->NewIterator(leveldb::ReadOptions());
+ size_t num_keys = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ num_keys++;
+ }
+ delete iter;
+ ASSERT_EQ(kNumKeys, num_keys) << "Bad number of keys";
+
+ // close database
+ delete db;
+ DestroyDB(dbpath, leveldb::Options());
+}
+
+} // anonymous namespace
+
+int main(int argc, char** argv) {
+ return leveldb::test::RunAllTests();
+}
diff --git a/src/leveldb/port/port_posix.h b/src/leveldb/port/port_posix.h
index f2b89bffb9..21c845e211 100644
--- a/src/leveldb/port/port_posix.h
+++ b/src/leveldb/port/port_posix.h
@@ -62,12 +62,16 @@
#define fflush_unlocked fflush
#endif
-#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\
+#if defined(OS_FREEBSD) ||\
defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD)
// Use fsync() on platforms without fdatasync()
#define fdatasync fsync
#endif
+#if defined(OS_MACOSX)
+#define fdatasync(fd) fcntl(fd, F_FULLFSYNC, 0)
+#endif
+
#if defined(OS_ANDROID) && __ANDROID_API__ < 9
// fdatasync() was only introduced in API level 9 on Android. Use fsync()
// when targetting older platforms.
diff --git a/src/leveldb/port/port_win.cc b/src/leveldb/port/port_win.cc
index 99c1d8e346..1b0f060a19 100644
--- a/src/leveldb/port/port_win.cc
+++ b/src/leveldb/port/port_win.cc
@@ -109,12 +109,10 @@ void CondVar::Signal() {
void CondVar::SignalAll() {
wait_mtx_.Lock();
- for(long i = 0; i < waiting_; ++i) {
- ::ReleaseSemaphore(sem1_, 1, NULL);
- while(waiting_ > 0) {
- --waiting_;
- ::WaitForSingleObject(sem2_, INFINITE);
- }
+ ::ReleaseSemaphore(sem1_, waiting_, NULL);
+ while(waiting_ > 0) {
+ --waiting_;
+ ::WaitForSingleObject(sem2_, INFINITE);
}
wait_mtx_.Unlock();
}
diff --git a/src/leveldb/table/block.cc b/src/leveldb/table/block.cc
index ab83c1112c..79ea9d9ee5 100644
--- a/src/leveldb/table/block.cc
+++ b/src/leveldb/table/block.cc
@@ -16,7 +16,7 @@
namespace leveldb {
inline uint32_t Block::NumRestarts() const {
- assert(size_ >= 2*sizeof(uint32_t));
+ assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
@@ -27,11 +27,12 @@ Block::Block(const BlockContents& contents)
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
- restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
- if (restart_offset_ > size_ - sizeof(uint32_t)) {
- // The size is too small for NumRestarts() and therefore
- // restart_offset_ wrapped around.
+ size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t);
+ if (NumRestarts() > max_restarts_allowed) {
+ // The size is too small for NumRestarts()
size_ = 0;
+ } else {
+ restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}
@@ -253,7 +254,7 @@ class Block::Iter : public Iterator {
};
Iterator* Block::NewIterator(const Comparator* cmp) {
- if (size_ < 2*sizeof(uint32_t)) {
+ if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
diff --git a/src/leveldb/table/table.cc b/src/leveldb/table/table.cc
index dbd6d3a1bf..71c1756e5f 100644
--- a/src/leveldb/table/table.cc
+++ b/src/leveldb/table/table.cc
@@ -228,7 +228,6 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
- Slice handle = iiter->value();
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
diff --git a/src/leveldb/table/table_test.cc b/src/leveldb/table/table_test.cc
index 57cea25334..c723bf84cf 100644
--- a/src/leveldb/table/table_test.cc
+++ b/src/leveldb/table/table_test.cc
@@ -644,6 +644,36 @@ class Harness {
Constructor* constructor_;
};
+// Test empty table/block.
+TEST(Harness, Empty) {
+ for (int i = 0; i < kNumTestArgs; i++) {
+ Init(kTestArgList[i]);
+ Random rnd(test::RandomSeed() + 1);
+ Test(&rnd);
+ }
+}
+
+// Special test for a block with no restart entries. The C++ leveldb
+// code never generates such blocks, but the Java version of leveldb
+// seems to.
+TEST(Harness, ZeroRestartPointsInBlock) {
+ char data[sizeof(uint32_t)];
+ memset(data, 0, sizeof(data));
+ BlockContents contents;
+ contents.data = Slice(data, sizeof(data));
+ contents.cachable = false;
+ contents.heap_allocated = false;
+ Block block(contents);
+ Iterator* iter = block.NewIterator(BytewiseComparator());
+ iter->SeekToFirst();
+ ASSERT_TRUE(!iter->Valid());
+ iter->SeekToLast();
+ ASSERT_TRUE(!iter->Valid());
+ iter->Seek("foo");
+ ASSERT_TRUE(!iter->Valid());
+ delete iter;
+}
+
// Test the empty key
TEST(Harness, SimpleEmptyKey) {
for (int i = 0; i < kNumTestArgs; i++) {
diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc
index 24f1f63f4f..8b197bc02a 100644
--- a/src/leveldb/util/cache.cc
+++ b/src/leveldb/util/cache.cc
@@ -116,7 +116,6 @@ class HandleTable {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
- Slice key = h->key();
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
@@ -160,7 +159,6 @@ class LRUCache {
// mutex_ protects the following state.
port::Mutex mutex_;
size_t usage_;
- uint64_t last_id_;
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
@@ -170,8 +168,7 @@ class LRUCache {
};
LRUCache::LRUCache()
- : usage_(0),
- last_id_(0) {
+ : usage_(0) {
// Make empty circular linked list
lru_.next = &lru_;
lru_.prev = &lru_;
diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc
index 2c52b17b60..fb5726e335 100644
--- a/src/leveldb/util/coding_test.cc
+++ b/src/leveldb/util/coding_test.cc
@@ -109,7 +109,7 @@ TEST(Coding, Varint64) {
values.push_back(power);
values.push_back(power-1);
values.push_back(power+1);
- };
+ }
std::string s;
for (int i = 0; i < values.size(); i++) {
diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc
index db81f56d11..0f5dcfac5a 100644
--- a/src/leveldb/util/env_posix.cc
+++ b/src/leveldb/util/env_posix.cc
@@ -320,8 +320,39 @@ class PosixMmapFile : public WritableFile {
return Status::OK();
}
- virtual Status Sync() {
+ Status SyncDirIfManifest() {
+ const char* f = filename_.c_str();
+ const char* sep = strrchr(f, '/');
+ Slice basename;
+ std::string dir;
+ if (sep == NULL) {
+ dir = ".";
+ basename = f;
+ } else {
+ dir = std::string(f, sep - f);
+ basename = sep + 1;
+ }
Status s;
+ if (basename.starts_with("MANIFEST")) {
+ int fd = open(dir.c_str(), O_RDONLY);
+ if (fd < 0) {
+ s = IOError(dir, errno);
+ } else {
+ if (fsync(fd) < 0) {
+ s = IOError(dir, errno);
+ }
+ close(fd);
+ }
+ }
+ return s;
+ }
+
+ virtual Status Sync() {
+ // Ensure new files referred to by the manifest are in the filesystem.
+ Status s = SyncDirIfManifest();
+ if (!s.ok()) {
+ return s;
+ }
if (pending_sync_) {
// Some unmapped data was not synced
@@ -386,7 +417,7 @@ class PosixEnv : public Env {
PosixEnv();
virtual ~PosixEnv() {
fprintf(stderr, "Destroying Env::Default()\n");
- exit(1);
+ abort();
}
virtual Status NewSequentialFile(const std::string& fname,
@@ -467,7 +498,7 @@ class PosixEnv : public Env {
result = IOError(fname, errno);
}
return result;
- };
+ }
virtual Status CreateDir(const std::string& name) {
Status result;
@@ -475,7 +506,7 @@ class PosixEnv : public Env {
result = IOError(name, errno);
}
return result;
- };
+ }
virtual Status DeleteDir(const std::string& name) {
Status result;
@@ -483,7 +514,7 @@ class PosixEnv : public Env {
result = IOError(name, errno);
}
return result;
- };
+ }
virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
Status s;
@@ -589,7 +620,7 @@ class PosixEnv : public Env {
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
- exit(1);
+ abort();
}
}
diff --git a/src/leveldb/util/hash.cc b/src/leveldb/util/hash.cc
index ba1818082d..07cf022060 100644
--- a/src/leveldb/util/hash.cc
+++ b/src/leveldb/util/hash.cc
@@ -6,6 +6,13 @@
#include "util/coding.h"
#include "util/hash.h"
+// The FALLTHROUGH_INTENDED macro can be used to annotate implicit fall-through
+// between switch labels. The real definition should be provided externally.
+// This one is a fallback version for unsupported compilers.
+#ifndef FALLTHROUGH_INTENDED
+#define FALLTHROUGH_INTENDED do { } while (0)
+#endif
+
namespace leveldb {
uint32_t Hash(const char* data, size_t n, uint32_t seed) {
@@ -28,10 +35,10 @@ uint32_t Hash(const char* data, size_t n, uint32_t seed) {
switch (limit - data) {
case 3:
h += data[2] << 16;
- // fall through
+ FALLTHROUGH_INTENDED;
case 2:
h += data[1] << 8;
- // fall through
+ FALLTHROUGH_INTENDED;
case 1:
h += data[0];
h *= m;
diff --git a/src/leveldb/util/random.h b/src/leveldb/util/random.h
index 07538242ea..ddd51b1c7b 100644
--- a/src/leveldb/util/random.h
+++ b/src/leveldb/util/random.h
@@ -16,7 +16,12 @@ class Random {
private:
uint32_t seed_;
public:
- explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { }
+ explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) {
+ // Avoid bad seeds.
+ if (seed_ == 0 || seed_ == 2147483647L) {
+ seed_ = 1;
+ }
+ }
uint32_t Next() {
static const uint32_t M = 2147483647L; // 2^31-1
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0