diff options
Diffstat (limited to 'src/leveldb/table')
-rw-r--r-- | src/leveldb/table/block.cc | 267 | ||||
-rw-r--r-- | src/leveldb/table/block.h | 44 | ||||
-rw-r--r-- | src/leveldb/table/block_builder.cc | 108 | ||||
-rw-r--r-- | src/leveldb/table/block_builder.h | 55 | ||||
-rw-r--r-- | src/leveldb/table/filter_block.cc | 106 | ||||
-rw-r--r-- | src/leveldb/table/filter_block.h | 69 | ||||
-rw-r--r-- | src/leveldb/table/filter_block_test.cc | 124 | ||||
-rw-r--r-- | src/leveldb/table/format.cc | 141 | ||||
-rw-r--r-- | src/leveldb/table/format.h | 100 | ||||
-rw-r--r-- | src/leveldb/table/iterator.cc | 76 | ||||
-rw-r--r-- | src/leveldb/table/iterator_wrapper.h | 92 | ||||
-rw-r--r-- | src/leveldb/table/merger.cc | 191 | ||||
-rw-r--r-- | src/leveldb/table/merger.h | 26 | ||||
-rw-r--r-- | src/leveldb/table/table.cc | 273 | ||||
-rw-r--r-- | src/leveldb/table/table_builder.cc | 265 | ||||
-rw-r--r-- | src/leveldb/table/table_test.cc | 837 | ||||
-rw-r--r-- | src/leveldb/table/two_level_iterator.cc | 171 | ||||
-rw-r--r-- | src/leveldb/table/two_level_iterator.h | 31 |
18 files changed, 2976 insertions, 0 deletions
diff --git a/src/leveldb/table/block.cc b/src/leveldb/table/block.cc new file mode 100644 index 0000000000..2fe89eaa45 --- /dev/null +++ b/src/leveldb/table/block.cc @@ -0,0 +1,267 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Decodes the blocks generated by block_builder.cc. + +#include "table/block.h" + +#include <algorithm> +#include <cstdint> +#include <vector> + +#include "leveldb/comparator.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/logging.h" + +namespace leveldb { + +inline uint32_t Block::NumRestarts() const { + assert(size_ >= sizeof(uint32_t)); + return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); +} + +Block::Block(const BlockContents& contents) + : data_(contents.data.data()), + size_(contents.data.size()), + owned_(contents.heap_allocated) { + if (size_ < sizeof(uint32_t)) { + size_ = 0; // Error marker + } else { + size_t max_restarts_allowed = (size_ - sizeof(uint32_t)) / sizeof(uint32_t); + if (NumRestarts() > max_restarts_allowed) { + // The size is too small for NumRestarts() + size_ = 0; + } else { + restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); + } + } +} + +Block::~Block() { + if (owned_) { + delete[] data_; + } +} + +// Helper routine: decode the next block entry starting at "p", +// storing the number of shared key bytes, non_shared key bytes, +// and the length of the value in "*shared", "*non_shared", and +// "*value_length", respectively. Will not dereference past "limit". +// +// If any errors are detected, returns nullptr. Otherwise, returns a +// pointer to the key delta (just past the three decoded values). +static inline const char* DecodeEntry(const char* p, const char* limit, + uint32_t* shared, uint32_t* non_shared, + uint32_t* value_length) { + if (limit - p < 3) return nullptr; + *shared = reinterpret_cast<const uint8_t*>(p)[0]; + *non_shared = reinterpret_cast<const uint8_t*>(p)[1]; + *value_length = reinterpret_cast<const uint8_t*>(p)[2]; + if ((*shared | *non_shared | *value_length) < 128) { + // Fast path: all three values are encoded in one byte each + p += 3; + } else { + if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr; + if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr; + if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr; + } + + if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) { + return nullptr; + } + return p; +} + +class Block::Iter : public Iterator { + private: + const Comparator* const comparator_; + const char* const data_; // underlying block contents + uint32_t const restarts_; // Offset of restart array (list of fixed32) + uint32_t const num_restarts_; // Number of uint32_t entries in restart array + + // current_ is offset in data_ of current entry. >= restarts_ if !Valid + uint32_t current_; + uint32_t restart_index_; // Index of restart block in which current_ falls + std::string key_; + Slice value_; + Status status_; + + inline int Compare(const Slice& a, const Slice& b) const { + return comparator_->Compare(a, b); + } + + // Return the offset in data_ just past the end of the current entry. + inline uint32_t NextEntryOffset() const { + return (value_.data() + value_.size()) - data_; + } + + uint32_t GetRestartPoint(uint32_t index) { + assert(index < num_restarts_); + return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t)); + } + + void SeekToRestartPoint(uint32_t index) { + key_.clear(); + restart_index_ = index; + // current_ will be fixed by ParseNextKey(); + + // ParseNextKey() starts at the end of value_, so set value_ accordingly + uint32_t offset = GetRestartPoint(index); + value_ = Slice(data_ + offset, 0); + } + + public: + Iter(const Comparator* comparator, const char* data, uint32_t restarts, + uint32_t num_restarts) + : comparator_(comparator), + data_(data), + restarts_(restarts), + num_restarts_(num_restarts), + current_(restarts_), + restart_index_(num_restarts_) { + assert(num_restarts_ > 0); + } + + bool Valid() const override { return current_ < restarts_; } + Status status() const override { return status_; } + Slice key() const override { + assert(Valid()); + return key_; + } + Slice value() const override { + assert(Valid()); + return value_; + } + + void Next() override { + assert(Valid()); + ParseNextKey(); + } + + void Prev() override { + assert(Valid()); + + // Scan backwards to a restart point before current_ + const uint32_t original = current_; + while (GetRestartPoint(restart_index_) >= original) { + if (restart_index_ == 0) { + // No more entries + current_ = restarts_; + restart_index_ = num_restarts_; + return; + } + restart_index_--; + } + + SeekToRestartPoint(restart_index_); + do { + // Loop until end of current entry hits the start of original entry + } while (ParseNextKey() && NextEntryOffset() < original); + } + + void Seek(const Slice& target) override { + // Binary search in restart array to find the last restart point + // with a key < target + uint32_t left = 0; + uint32_t right = num_restarts_ - 1; + while (left < right) { + uint32_t mid = (left + right + 1) / 2; + uint32_t region_offset = GetRestartPoint(mid); + uint32_t shared, non_shared, value_length; + const char* key_ptr = + DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, + &non_shared, &value_length); + if (key_ptr == nullptr || (shared != 0)) { + CorruptionError(); + return; + } + Slice mid_key(key_ptr, non_shared); + if (Compare(mid_key, target) < 0) { + // Key at "mid" is smaller than "target". Therefore all + // blocks before "mid" are uninteresting. + left = mid; + } else { + // Key at "mid" is >= "target". Therefore all blocks at or + // after "mid" are uninteresting. + right = mid - 1; + } + } + + // Linear search (within restart block) for first key >= target + SeekToRestartPoint(left); + while (true) { + if (!ParseNextKey()) { + return; + } + if (Compare(key_, target) >= 0) { + return; + } + } + } + + void SeekToFirst() override { + SeekToRestartPoint(0); + ParseNextKey(); + } + + void SeekToLast() override { + SeekToRestartPoint(num_restarts_ - 1); + while (ParseNextKey() && NextEntryOffset() < restarts_) { + // Keep skipping + } + } + + private: + void CorruptionError() { + current_ = restarts_; + restart_index_ = num_restarts_; + status_ = Status::Corruption("bad entry in block"); + key_.clear(); + value_.clear(); + } + + bool ParseNextKey() { + current_ = NextEntryOffset(); + const char* p = data_ + current_; + const char* limit = data_ + restarts_; // Restarts come right after data + if (p >= limit) { + // No more entries to return. Mark as invalid. + current_ = restarts_; + restart_index_ = num_restarts_; + return false; + } + + // Decode next entry + uint32_t shared, non_shared, value_length; + p = DecodeEntry(p, limit, &shared, &non_shared, &value_length); + if (p == nullptr || key_.size() < shared) { + CorruptionError(); + return false; + } else { + key_.resize(shared); + key_.append(p, non_shared); + value_ = Slice(p + non_shared, value_length); + while (restart_index_ + 1 < num_restarts_ && + GetRestartPoint(restart_index_ + 1) < current_) { + ++restart_index_; + } + return true; + } + } +}; + +Iterator* Block::NewIterator(const Comparator* comparator) { + if (size_ < sizeof(uint32_t)) { + return NewErrorIterator(Status::Corruption("bad block contents")); + } + const uint32_t num_restarts = NumRestarts(); + if (num_restarts == 0) { + return NewEmptyIterator(); + } else { + return new Iter(comparator, data_, restart_offset_, num_restarts); + } +} + +} // namespace leveldb diff --git a/src/leveldb/table/block.h b/src/leveldb/table/block.h new file mode 100644 index 0000000000..c8f1f7b436 --- /dev/null +++ b/src/leveldb/table/block.h @@ -0,0 +1,44 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_BLOCK_H_ +#define STORAGE_LEVELDB_TABLE_BLOCK_H_ + +#include <stddef.h> +#include <stdint.h> + +#include "leveldb/iterator.h" + +namespace leveldb { + +struct BlockContents; +class Comparator; + +class Block { + public: + // Initialize the block with the specified contents. + explicit Block(const BlockContents& contents); + + Block(const Block&) = delete; + Block& operator=(const Block&) = delete; + + ~Block(); + + size_t size() const { return size_; } + Iterator* NewIterator(const Comparator* comparator); + + private: + class Iter; + + uint32_t NumRestarts() const; + + const char* data_; + size_t size_; + uint32_t restart_offset_; // Offset in data_ of restart array + bool owned_; // Block owns data_[] +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_BLOCK_H_ diff --git a/src/leveldb/table/block_builder.cc b/src/leveldb/table/block_builder.cc new file mode 100644 index 0000000000..919cff5c93 --- /dev/null +++ b/src/leveldb/table/block_builder.cc @@ -0,0 +1,108 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// BlockBuilder generates blocks where keys are prefix-compressed: +// +// When we store a key, we drop the prefix shared with the previous +// string. This helps reduce the space requirement significantly. +// Furthermore, once every K keys, we do not apply the prefix +// compression and store the entire key. We call this a "restart +// point". The tail end of the block stores the offsets of all of the +// restart points, and can be used to do a binary search when looking +// for a particular key. Values are stored as-is (without compression) +// immediately following the corresponding key. +// +// An entry for a particular key-value pair has the form: +// shared_bytes: varint32 +// unshared_bytes: varint32 +// value_length: varint32 +// key_delta: char[unshared_bytes] +// value: char[value_length] +// shared_bytes == 0 for restart points. +// +// The trailer of the block has the form: +// restarts: uint32[num_restarts] +// num_restarts: uint32 +// restarts[i] contains the offset within the block of the ith restart point. + +#include "table/block_builder.h" + +#include <assert.h> + +#include <algorithm> + +#include "leveldb/comparator.h" +#include "leveldb/options.h" +#include "util/coding.h" + +namespace leveldb { + +BlockBuilder::BlockBuilder(const Options* options) + : options_(options), restarts_(), counter_(0), finished_(false) { + assert(options->block_restart_interval >= 1); + restarts_.push_back(0); // First restart point is at offset 0 +} + +void BlockBuilder::Reset() { + buffer_.clear(); + restarts_.clear(); + restarts_.push_back(0); // First restart point is at offset 0 + counter_ = 0; + finished_ = false; + last_key_.clear(); +} + +size_t BlockBuilder::CurrentSizeEstimate() const { + return (buffer_.size() + // Raw data buffer + restarts_.size() * sizeof(uint32_t) + // Restart array + sizeof(uint32_t)); // Restart array length +} + +Slice BlockBuilder::Finish() { + // Append restart array + for (size_t i = 0; i < restarts_.size(); i++) { + PutFixed32(&buffer_, restarts_[i]); + } + PutFixed32(&buffer_, restarts_.size()); + finished_ = true; + return Slice(buffer_); +} + +void BlockBuilder::Add(const Slice& key, const Slice& value) { + Slice last_key_piece(last_key_); + assert(!finished_); + assert(counter_ <= options_->block_restart_interval); + assert(buffer_.empty() // No values yet? + || options_->comparator->Compare(key, last_key_piece) > 0); + size_t shared = 0; + if (counter_ < options_->block_restart_interval) { + // See how much sharing to do with previous string + const size_t min_length = std::min(last_key_piece.size(), key.size()); + while ((shared < min_length) && (last_key_piece[shared] == key[shared])) { + shared++; + } + } else { + // Restart compression + restarts_.push_back(buffer_.size()); + counter_ = 0; + } + const size_t non_shared = key.size() - shared; + + // Add "<shared><non_shared><value_size>" to buffer_ + PutVarint32(&buffer_, shared); + PutVarint32(&buffer_, non_shared); + PutVarint32(&buffer_, value.size()); + + // Add string delta to buffer_ followed by value + buffer_.append(key.data() + shared, non_shared); + buffer_.append(value.data(), value.size()); + + // Update state + last_key_.resize(shared); + last_key_.append(key.data() + shared, non_shared); + assert(Slice(last_key_) == key); + counter_++; +} + +} // namespace leveldb diff --git a/src/leveldb/table/block_builder.h b/src/leveldb/table/block_builder.h new file mode 100644 index 0000000000..f91f5e6d47 --- /dev/null +++ b/src/leveldb/table/block_builder.h @@ -0,0 +1,55 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_BLOCK_BUILDER_H_ +#define STORAGE_LEVELDB_TABLE_BLOCK_BUILDER_H_ + +#include <stdint.h> + +#include <vector> + +#include "leveldb/slice.h" + +namespace leveldb { + +struct Options; + +class BlockBuilder { + public: + explicit BlockBuilder(const Options* options); + + BlockBuilder(const BlockBuilder&) = delete; + BlockBuilder& operator=(const BlockBuilder&) = delete; + + // Reset the contents as if the BlockBuilder was just constructed. + void Reset(); + + // REQUIRES: Finish() has not been called since the last call to Reset(). + // REQUIRES: key is larger than any previously added key + void Add(const Slice& key, const Slice& value); + + // Finish building the block and return a slice that refers to the + // block contents. The returned slice will remain valid for the + // lifetime of this builder or until Reset() is called. + Slice Finish(); + + // Returns an estimate of the current (uncompressed) size of the block + // we are building. + size_t CurrentSizeEstimate() const; + + // Return true iff no entries have been added since the last Reset() + bool empty() const { return buffer_.empty(); } + + private: + const Options* options_; + std::string buffer_; // Destination buffer + std::vector<uint32_t> restarts_; // Restart points + int counter_; // Number of entries emitted since restart + bool finished_; // Has Finish() been called? + std::string last_key_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_BLOCK_BUILDER_H_ diff --git a/src/leveldb/table/filter_block.cc b/src/leveldb/table/filter_block.cc new file mode 100644 index 0000000000..09ec0094bd --- /dev/null +++ b/src/leveldb/table/filter_block.cc @@ -0,0 +1,106 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/filter_block.h" + +#include "leveldb/filter_policy.h" +#include "util/coding.h" + +namespace leveldb { + +// See doc/table_format.md for an explanation of the filter block format. + +// Generate new filter every 2KB of data +static const size_t kFilterBaseLg = 11; +static const size_t kFilterBase = 1 << kFilterBaseLg; + +FilterBlockBuilder::FilterBlockBuilder(const FilterPolicy* policy) + : policy_(policy) {} + +void FilterBlockBuilder::StartBlock(uint64_t block_offset) { + uint64_t filter_index = (block_offset / kFilterBase); + assert(filter_index >= filter_offsets_.size()); + while (filter_index > filter_offsets_.size()) { + GenerateFilter(); + } +} + +void FilterBlockBuilder::AddKey(const Slice& key) { + Slice k = key; + start_.push_back(keys_.size()); + keys_.append(k.data(), k.size()); +} + +Slice FilterBlockBuilder::Finish() { + if (!start_.empty()) { + GenerateFilter(); + } + + // Append array of per-filter offsets + const uint32_t array_offset = result_.size(); + for (size_t i = 0; i < filter_offsets_.size(); i++) { + PutFixed32(&result_, filter_offsets_[i]); + } + + PutFixed32(&result_, array_offset); + result_.push_back(kFilterBaseLg); // Save encoding parameter in result + return Slice(result_); +} + +void FilterBlockBuilder::GenerateFilter() { + const size_t num_keys = start_.size(); + if (num_keys == 0) { + // Fast path if there are no keys for this filter + filter_offsets_.push_back(result_.size()); + return; + } + + // Make list of keys from flattened key structure + start_.push_back(keys_.size()); // Simplify length computation + tmp_keys_.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + const char* base = keys_.data() + start_[i]; + size_t length = start_[i + 1] - start_[i]; + tmp_keys_[i] = Slice(base, length); + } + + // Generate filter for current set of keys and append to result_. + filter_offsets_.push_back(result_.size()); + policy_->CreateFilter(&tmp_keys_[0], static_cast<int>(num_keys), &result_); + + tmp_keys_.clear(); + keys_.clear(); + start_.clear(); +} + +FilterBlockReader::FilterBlockReader(const FilterPolicy* policy, + const Slice& contents) + : policy_(policy), data_(nullptr), offset_(nullptr), num_(0), base_lg_(0) { + size_t n = contents.size(); + if (n < 5) return; // 1 byte for base_lg_ and 4 for start of offset array + base_lg_ = contents[n - 1]; + uint32_t last_word = DecodeFixed32(contents.data() + n - 5); + if (last_word > n - 5) return; + data_ = contents.data(); + offset_ = data_ + last_word; + num_ = (n - 5 - last_word) / 4; +} + +bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, const Slice& key) { + uint64_t index = block_offset >> base_lg_; + if (index < num_) { + uint32_t start = DecodeFixed32(offset_ + index * 4); + uint32_t limit = DecodeFixed32(offset_ + index * 4 + 4); + if (start <= limit && limit <= static_cast<size_t>(offset_ - data_)) { + Slice filter = Slice(data_ + start, limit - start); + return policy_->KeyMayMatch(key, filter); + } else if (start == limit) { + // Empty filters do not match any keys + return false; + } + } + return true; // Errors are treated as potential matches +} + +} // namespace leveldb diff --git a/src/leveldb/table/filter_block.h b/src/leveldb/table/filter_block.h new file mode 100644 index 0000000000..73b5399249 --- /dev/null +++ b/src/leveldb/table/filter_block.h @@ -0,0 +1,69 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A filter block is stored near the end of a Table file. It contains +// filters (e.g., bloom filters) for all data blocks in the table combined +// into a single filter block. + +#ifndef STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ +#define STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ + +#include <stddef.h> +#include <stdint.h> + +#include <string> +#include <vector> + +#include "leveldb/slice.h" +#include "util/hash.h" + +namespace leveldb { + +class FilterPolicy; + +// A FilterBlockBuilder is used to construct all of the filters for a +// particular Table. It generates a single string which is stored as +// a special block in the Table. +// +// The sequence of calls to FilterBlockBuilder must match the regexp: +// (StartBlock AddKey*)* Finish +class FilterBlockBuilder { + public: + explicit FilterBlockBuilder(const FilterPolicy*); + + FilterBlockBuilder(const FilterBlockBuilder&) = delete; + FilterBlockBuilder& operator=(const FilterBlockBuilder&) = delete; + + void StartBlock(uint64_t block_offset); + void AddKey(const Slice& key); + Slice Finish(); + + private: + void GenerateFilter(); + + const FilterPolicy* policy_; + std::string keys_; // Flattened key contents + std::vector<size_t> start_; // Starting index in keys_ of each key + std::string result_; // Filter data computed so far + std::vector<Slice> tmp_keys_; // policy_->CreateFilter() argument + std::vector<uint32_t> filter_offsets_; +}; + +class FilterBlockReader { + public: + // REQUIRES: "contents" and *policy must stay live while *this is live. + FilterBlockReader(const FilterPolicy* policy, const Slice& contents); + bool KeyMayMatch(uint64_t block_offset, const Slice& key); + + private: + const FilterPolicy* policy_; + const char* data_; // Pointer to filter data (at block-start) + const char* offset_; // Pointer to beginning of offset array (at block-end) + size_t num_; // Number of entries in offset array + size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ diff --git a/src/leveldb/table/filter_block_test.cc b/src/leveldb/table/filter_block_test.cc new file mode 100644 index 0000000000..8b33bbdd18 --- /dev/null +++ b/src/leveldb/table/filter_block_test.cc @@ -0,0 +1,124 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/filter_block.h" + +#include "leveldb/filter_policy.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +// For testing: emit an array with one hash value per key +class TestHashFilter : public FilterPolicy { + public: + const char* Name() const override { return "TestHashFilter"; } + + void CreateFilter(const Slice* keys, int n, std::string* dst) const override { + for (int i = 0; i < n; i++) { + uint32_t h = Hash(keys[i].data(), keys[i].size(), 1); + PutFixed32(dst, h); + } + } + + bool KeyMayMatch(const Slice& key, const Slice& filter) const override { + uint32_t h = Hash(key.data(), key.size(), 1); + for (size_t i = 0; i + 4 <= filter.size(); i += 4) { + if (h == DecodeFixed32(filter.data() + i)) { + return true; + } + } + return false; + } +}; + +class FilterBlockTest { + public: + TestHashFilter policy_; +}; + +TEST(FilterBlockTest, EmptyBuilder) { + FilterBlockBuilder builder(&policy_); + Slice block = builder.Finish(); + ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block)); + FilterBlockReader reader(&policy_, block); + ASSERT_TRUE(reader.KeyMayMatch(0, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(100000, "foo")); +} + +TEST(FilterBlockTest, SingleChunk) { + FilterBlockBuilder builder(&policy_); + builder.StartBlock(100); + builder.AddKey("foo"); + builder.AddKey("bar"); + builder.AddKey("box"); + builder.StartBlock(200); + builder.AddKey("box"); + builder.StartBlock(300); + builder.AddKey("hello"); + Slice block = builder.Finish(); + FilterBlockReader reader(&policy_, block); + ASSERT_TRUE(reader.KeyMayMatch(100, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(100, "bar")); + ASSERT_TRUE(reader.KeyMayMatch(100, "box")); + ASSERT_TRUE(reader.KeyMayMatch(100, "hello")); + ASSERT_TRUE(reader.KeyMayMatch(100, "foo")); + ASSERT_TRUE(!reader.KeyMayMatch(100, "missing")); + ASSERT_TRUE(!reader.KeyMayMatch(100, "other")); +} + +TEST(FilterBlockTest, MultiChunk) { + FilterBlockBuilder builder(&policy_); + + // First filter + builder.StartBlock(0); + builder.AddKey("foo"); + builder.StartBlock(2000); + builder.AddKey("bar"); + + // Second filter + builder.StartBlock(3100); + builder.AddKey("box"); + + // Third filter is empty + + // Last filter + builder.StartBlock(9000); + builder.AddKey("box"); + builder.AddKey("hello"); + + Slice block = builder.Finish(); + FilterBlockReader reader(&policy_, block); + + // Check first filter + ASSERT_TRUE(reader.KeyMayMatch(0, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(2000, "bar")); + ASSERT_TRUE(!reader.KeyMayMatch(0, "box")); + ASSERT_TRUE(!reader.KeyMayMatch(0, "hello")); + + // Check second filter + ASSERT_TRUE(reader.KeyMayMatch(3100, "box")); + ASSERT_TRUE(!reader.KeyMayMatch(3100, "foo")); + ASSERT_TRUE(!reader.KeyMayMatch(3100, "bar")); + ASSERT_TRUE(!reader.KeyMayMatch(3100, "hello")); + + // Check third filter (empty) + ASSERT_TRUE(!reader.KeyMayMatch(4100, "foo")); + ASSERT_TRUE(!reader.KeyMayMatch(4100, "bar")); + ASSERT_TRUE(!reader.KeyMayMatch(4100, "box")); + ASSERT_TRUE(!reader.KeyMayMatch(4100, "hello")); + + // Check last filter + ASSERT_TRUE(reader.KeyMayMatch(9000, "box")); + ASSERT_TRUE(reader.KeyMayMatch(9000, "hello")); + ASSERT_TRUE(!reader.KeyMayMatch(9000, "foo")); + ASSERT_TRUE(!reader.KeyMayMatch(9000, "bar")); +} + +} // namespace leveldb + +int main(int argc, char** argv) { return leveldb::test::RunAllTests(); } diff --git a/src/leveldb/table/format.cc b/src/leveldb/table/format.cc new file mode 100644 index 0000000000..a3d67de2e4 --- /dev/null +++ b/src/leveldb/table/format.cc @@ -0,0 +1,141 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/format.h" + +#include "leveldb/env.h" +#include "port/port.h" +#include "table/block.h" +#include "util/coding.h" +#include "util/crc32c.h" + +namespace leveldb { + +void BlockHandle::EncodeTo(std::string* dst) const { + // Sanity check that all fields have been set + assert(offset_ != ~static_cast<uint64_t>(0)); + assert(size_ != ~static_cast<uint64_t>(0)); + PutVarint64(dst, offset_); + PutVarint64(dst, size_); +} + +Status BlockHandle::DecodeFrom(Slice* input) { + if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) { + return Status::OK(); + } else { + return Status::Corruption("bad block handle"); + } +} + +void Footer::EncodeTo(std::string* dst) const { + const size_t original_size = dst->size(); + metaindex_handle_.EncodeTo(dst); + index_handle_.EncodeTo(dst); + dst->resize(2 * BlockHandle::kMaxEncodedLength); // Padding + PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber & 0xffffffffu)); + PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber >> 32)); + assert(dst->size() == original_size + kEncodedLength); + (void)original_size; // Disable unused variable warning. +} + +Status Footer::DecodeFrom(Slice* input) { + const char* magic_ptr = input->data() + kEncodedLength - 8; + const uint32_t magic_lo = DecodeFixed32(magic_ptr); + const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4); + const uint64_t magic = ((static_cast<uint64_t>(magic_hi) << 32) | + (static_cast<uint64_t>(magic_lo))); + if (magic != kTableMagicNumber) { + return Status::Corruption("not an sstable (bad magic number)"); + } + + Status result = metaindex_handle_.DecodeFrom(input); + if (result.ok()) { + result = index_handle_.DecodeFrom(input); + } + if (result.ok()) { + // We skip over any leftover data (just padding for now) in "input" + const char* end = magic_ptr + 8; + *input = Slice(end, input->data() + input->size() - end); + } + return result; +} + +Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, + const BlockHandle& handle, BlockContents* result) { + result->data = Slice(); + result->cachable = false; + result->heap_allocated = false; + + // Read the block contents as well as the type/crc footer. + // See table_builder.cc for the code that built this structure. + size_t n = static_cast<size_t>(handle.size()); + char* buf = new char[n + kBlockTrailerSize]; + Slice contents; + Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf); + if (!s.ok()) { + delete[] buf; + return s; + } + if (contents.size() != n + kBlockTrailerSize) { + delete[] buf; + return Status::Corruption("truncated block read", file->GetName()); + } + + // Check the crc of the type and the block contents + const char* data = contents.data(); // Pointer to where Read put the data + if (options.verify_checksums) { + const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1)); + const uint32_t actual = crc32c::Value(data, n + 1); + if (actual != crc) { + delete[] buf; + s = Status::Corruption("block checksum mismatch", file->GetName()); + return s; + } + } + + switch (data[n]) { + case kNoCompression: + if (data != buf) { + // File implementation gave us pointer to some other data. + // Use it directly under the assumption that it will be live + // while the file is open. + delete[] buf; + result->data = Slice(data, n); + result->heap_allocated = false; + result->cachable = false; // Do not double-cache + } else { + result->data = Slice(buf, n); + result->heap_allocated = true; + result->cachable = true; + } + + // Ok + break; + case kSnappyCompression: { + size_t ulength = 0; + if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { + delete[] buf; + return Status::Corruption("corrupted compressed block contents", file->GetName()); + } + char* ubuf = new char[ulength]; + if (!port::Snappy_Uncompress(data, n, ubuf)) { + delete[] buf; + delete[] ubuf; + return Status::Corruption("corrupted compressed block contents", file->GetName()); + } + delete[] buf; + result->data = Slice(ubuf, ulength); + result->heap_allocated = true; + result->cachable = true; + break; + } + default: + delete[] buf; + return Status::Corruption("bad block type", file->GetName()); + } + + return Status::OK(); +} + +} // namespace leveldb diff --git a/src/leveldb/table/format.h b/src/leveldb/table/format.h new file mode 100644 index 0000000000..e49dfdc047 --- /dev/null +++ b/src/leveldb/table/format.h @@ -0,0 +1,100 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_FORMAT_H_ +#define STORAGE_LEVELDB_TABLE_FORMAT_H_ + +#include <stdint.h> + +#include <string> + +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "leveldb/table_builder.h" + +namespace leveldb { + +class Block; +class RandomAccessFile; +struct ReadOptions; + +// BlockHandle is a pointer to the extent of a file that stores a data +// block or a meta block. +class BlockHandle { + public: + // Maximum encoding length of a BlockHandle + enum { kMaxEncodedLength = 10 + 10 }; + + BlockHandle(); + + // The offset of the block in the file. + uint64_t offset() const { return offset_; } + void set_offset(uint64_t offset) { offset_ = offset; } + + // The size of the stored block + uint64_t size() const { return size_; } + void set_size(uint64_t size) { size_ = size; } + + void EncodeTo(std::string* dst) const; + Status DecodeFrom(Slice* input); + + private: + uint64_t offset_; + uint64_t size_; +}; + +// Footer encapsulates the fixed information stored at the tail +// end of every table file. +class Footer { + public: + // Encoded length of a Footer. Note that the serialization of a + // Footer will always occupy exactly this many bytes. It consists + // of two block handles and a magic number. + enum { kEncodedLength = 2 * BlockHandle::kMaxEncodedLength + 8 }; + + Footer() = default; + + // The block handle for the metaindex block of the table + const BlockHandle& metaindex_handle() const { return metaindex_handle_; } + void set_metaindex_handle(const BlockHandle& h) { metaindex_handle_ = h; } + + // The block handle for the index block of the table + const BlockHandle& index_handle() const { return index_handle_; } + void set_index_handle(const BlockHandle& h) { index_handle_ = h; } + + void EncodeTo(std::string* dst) const; + Status DecodeFrom(Slice* input); + + private: + BlockHandle metaindex_handle_; + BlockHandle index_handle_; +}; + +// kTableMagicNumber was picked by running +// echo http://code.google.com/p/leveldb/ | sha1sum +// and taking the leading 64 bits. +static const uint64_t kTableMagicNumber = 0xdb4775248b80fb57ull; + +// 1-byte type + 32-bit crc +static const size_t kBlockTrailerSize = 5; + +struct BlockContents { + Slice data; // Actual contents of data + bool cachable; // True iff data can be cached + bool heap_allocated; // True iff caller should delete[] data.data() +}; + +// Read the block identified by "handle" from "file". On failure +// return non-OK. On success fill *result and return OK. +Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, + const BlockHandle& handle, BlockContents* result); + +// Implementation details follow. Clients should ignore, + +inline BlockHandle::BlockHandle() + : offset_(~static_cast<uint64_t>(0)), size_(~static_cast<uint64_t>(0)) {} + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_FORMAT_H_ diff --git a/src/leveldb/table/iterator.cc b/src/leveldb/table/iterator.cc new file mode 100644 index 0000000000..dfef083d4d --- /dev/null +++ b/src/leveldb/table/iterator.cc @@ -0,0 +1,76 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/iterator.h" + +namespace leveldb { + +Iterator::Iterator() { + cleanup_head_.function = nullptr; + cleanup_head_.next = nullptr; +} + +Iterator::~Iterator() { + if (!cleanup_head_.IsEmpty()) { + cleanup_head_.Run(); + for (CleanupNode* node = cleanup_head_.next; node != nullptr;) { + node->Run(); + CleanupNode* next_node = node->next; + delete node; + node = next_node; + } + } +} + +void Iterator::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) { + assert(func != nullptr); + CleanupNode* node; + if (cleanup_head_.IsEmpty()) { + node = &cleanup_head_; + } else { + node = new CleanupNode(); + node->next = cleanup_head_.next; + cleanup_head_.next = node; + } + node->function = func; + node->arg1 = arg1; + node->arg2 = arg2; +} + +namespace { + +class EmptyIterator : public Iterator { + public: + EmptyIterator(const Status& s) : status_(s) {} + ~EmptyIterator() override = default; + + bool Valid() const override { return false; } + void Seek(const Slice& target) override {} + void SeekToFirst() override {} + void SeekToLast() override {} + void Next() override { assert(false); } + void Prev() override { assert(false); } + Slice key() const override { + assert(false); + return Slice(); + } + Slice value() const override { + assert(false); + return Slice(); + } + Status status() const override { return status_; } + + private: + Status status_; +}; + +} // anonymous namespace + +Iterator* NewEmptyIterator() { return new EmptyIterator(Status::OK()); } + +Iterator* NewErrorIterator(const Status& status) { + return new EmptyIterator(status); +} + +} // namespace leveldb diff --git a/src/leveldb/table/iterator_wrapper.h b/src/leveldb/table/iterator_wrapper.h new file mode 100644 index 0000000000..c230572529 --- /dev/null +++ b/src/leveldb/table/iterator_wrapper.h @@ -0,0 +1,92 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_ +#define STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_ + +#include "leveldb/iterator.h" +#include "leveldb/slice.h" + +namespace leveldb { + +// A internal wrapper class with an interface similar to Iterator that +// caches the valid() and key() results for an underlying iterator. +// This can help avoid virtual function calls and also gives better +// cache locality. +class IteratorWrapper { + public: + IteratorWrapper() : iter_(nullptr), valid_(false) {} + explicit IteratorWrapper(Iterator* iter) : iter_(nullptr) { Set(iter); } + ~IteratorWrapper() { delete iter_; } + Iterator* iter() const { return iter_; } + + // Takes ownership of "iter" and will delete it when destroyed, or + // when Set() is invoked again. + void Set(Iterator* iter) { + delete iter_; + iter_ = iter; + if (iter_ == nullptr) { + valid_ = false; + } else { + Update(); + } + } + + // Iterator interface methods + bool Valid() const { return valid_; } + Slice key() const { + assert(Valid()); + return key_; + } + Slice value() const { + assert(Valid()); + return iter_->value(); + } + // Methods below require iter() != nullptr + Status status() const { + assert(iter_); + return iter_->status(); + } + void Next() { + assert(iter_); + iter_->Next(); + Update(); + } + void Prev() { + assert(iter_); + iter_->Prev(); + Update(); + } + void Seek(const Slice& k) { + assert(iter_); + iter_->Seek(k); + Update(); + } + void SeekToFirst() { + assert(iter_); + iter_->SeekToFirst(); + Update(); + } + void SeekToLast() { + assert(iter_); + iter_->SeekToLast(); + Update(); + } + + private: + void Update() { + valid_ = iter_->Valid(); + if (valid_) { + key_ = iter_->key(); + } + } + + Iterator* iter_; + bool valid_; + Slice key_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_ diff --git a/src/leveldb/table/merger.cc b/src/leveldb/table/merger.cc new file mode 100644 index 0000000000..76441b1cc2 --- /dev/null +++ b/src/leveldb/table/merger.cc @@ -0,0 +1,191 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/merger.h" + +#include "leveldb/comparator.h" +#include "leveldb/iterator.h" +#include "table/iterator_wrapper.h" + +namespace leveldb { + +namespace { +class MergingIterator : public Iterator { + public: + MergingIterator(const Comparator* comparator, Iterator** children, int n) + : comparator_(comparator), + children_(new IteratorWrapper[n]), + n_(n), + current_(nullptr), + direction_(kForward) { + for (int i = 0; i < n; i++) { + children_[i].Set(children[i]); + } + } + + ~MergingIterator() override { delete[] children_; } + + bool Valid() const override { return (current_ != nullptr); } + + void SeekToFirst() override { + for (int i = 0; i < n_; i++) { + children_[i].SeekToFirst(); + } + FindSmallest(); + direction_ = kForward; + } + + void SeekToLast() override { + for (int i = 0; i < n_; i++) { + children_[i].SeekToLast(); + } + FindLargest(); + direction_ = kReverse; + } + + void Seek(const Slice& target) override { + for (int i = 0; i < n_; i++) { + children_[i].Seek(target); + } + FindSmallest(); + direction_ = kForward; + } + + void Next() override { + assert(Valid()); + + // Ensure that all children are positioned after key(). + // If we are moving in the forward direction, it is already + // true for all of the non-current_ children since current_ is + // the smallest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. + if (direction_ != kForward) { + for (int i = 0; i < n_; i++) { + IteratorWrapper* child = &children_[i]; + if (child != current_) { + child->Seek(key()); + if (child->Valid() && + comparator_->Compare(key(), child->key()) == 0) { + child->Next(); + } + } + } + direction_ = kForward; + } + + current_->Next(); + FindSmallest(); + } + + void Prev() override { + assert(Valid()); + + // Ensure that all children are positioned before key(). + // If we are moving in the reverse direction, it is already + // true for all of the non-current_ children since current_ is + // the largest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. + if (direction_ != kReverse) { + for (int i = 0; i < n_; i++) { + IteratorWrapper* child = &children_[i]; + if (child != current_) { + child->Seek(key()); + if (child->Valid()) { + // Child is at first entry >= key(). Step back one to be < key() + child->Prev(); + } else { + // Child has no entries >= key(). Position at last entry. + child->SeekToLast(); + } + } + } + direction_ = kReverse; + } + + current_->Prev(); + FindLargest(); + } + + Slice key() const override { + assert(Valid()); + return current_->key(); + } + + Slice value() const override { + assert(Valid()); + return current_->value(); + } + + Status status() const override { + Status status; + for (int i = 0; i < n_; i++) { + status = children_[i].status(); + if (!status.ok()) { + break; + } + } + return status; + } + + private: + // Which direction is the iterator moving? + enum Direction { kForward, kReverse }; + + void FindSmallest(); + void FindLargest(); + + // We might want to use a heap in case there are lots of children. + // For now we use a simple array since we expect a very small number + // of children in leveldb. + const Comparator* comparator_; + IteratorWrapper* children_; + int n_; + IteratorWrapper* current_; + Direction direction_; +}; + +void MergingIterator::FindSmallest() { + IteratorWrapper* smallest = nullptr; + for (int i = 0; i < n_; i++) { + IteratorWrapper* child = &children_[i]; + if (child->Valid()) { + if (smallest == nullptr) { + smallest = child; + } else if (comparator_->Compare(child->key(), smallest->key()) < 0) { + smallest = child; + } + } + } + current_ = smallest; +} + +void MergingIterator::FindLargest() { + IteratorWrapper* largest = nullptr; + for (int i = n_ - 1; i >= 0; i--) { + IteratorWrapper* child = &children_[i]; + if (child->Valid()) { + if (largest == nullptr) { + largest = child; + } else if (comparator_->Compare(child->key(), largest->key()) > 0) { + largest = child; + } + } + } + current_ = largest; +} +} // namespace + +Iterator* NewMergingIterator(const Comparator* comparator, Iterator** children, + int n) { + assert(n >= 0); + if (n == 0) { + return NewEmptyIterator(); + } else if (n == 1) { + return children[0]; + } else { + return new MergingIterator(comparator, children, n); + } +} + +} // namespace leveldb diff --git a/src/leveldb/table/merger.h b/src/leveldb/table/merger.h new file mode 100644 index 0000000000..41cedc5254 --- /dev/null +++ b/src/leveldb/table/merger.h @@ -0,0 +1,26 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_MERGER_H_ +#define STORAGE_LEVELDB_TABLE_MERGER_H_ + +namespace leveldb { + +class Comparator; +class Iterator; + +// Return an iterator that provided the union of the data in +// children[0,n-1]. Takes ownership of the child iterators and +// will delete them when the result iterator is deleted. +// +// The result does no duplicate suppression. I.e., if a particular +// key is present in K child iterators, it will be yielded K times. +// +// REQUIRES: n >= 0 +Iterator* NewMergingIterator(const Comparator* comparator, Iterator** children, + int n); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_MERGER_H_ diff --git a/src/leveldb/table/table.cc b/src/leveldb/table/table.cc new file mode 100644 index 0000000000..b07bc88c7e --- /dev/null +++ b/src/leveldb/table/table.cc @@ -0,0 +1,273 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/table.h" + +#include "leveldb/cache.h" +#include "leveldb/comparator.h" +#include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/options.h" +#include "table/block.h" +#include "table/filter_block.h" +#include "table/format.h" +#include "table/two_level_iterator.h" +#include "util/coding.h" + +namespace leveldb { + +struct Table::Rep { + ~Rep() { + delete filter; + delete[] filter_data; + delete index_block; + } + + Options options; + Status status; + RandomAccessFile* file; + uint64_t cache_id; + FilterBlockReader* filter; + const char* filter_data; + + BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer + Block* index_block; +}; + +Status Table::Open(const Options& options, RandomAccessFile* file, + uint64_t size, Table** table) { + *table = nullptr; + if (size < Footer::kEncodedLength) { + return Status::Corruption("file is too short to be an sstable"); + } + + char footer_space[Footer::kEncodedLength]; + Slice footer_input; + Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength, + &footer_input, footer_space); + if (!s.ok()) return s; + + Footer footer; + s = footer.DecodeFrom(&footer_input); + if (!s.ok()) return s; + + // Read the index block + BlockContents index_block_contents; + if (s.ok()) { + ReadOptions opt; + if (options.paranoid_checks) { + opt.verify_checksums = true; + } + s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents); + } + + if (s.ok()) { + // We've successfully read the footer and the index block: we're + // ready to serve requests. + Block* index_block = new Block(index_block_contents); + Rep* rep = new Table::Rep; + rep->options = options; + rep->file = file; + rep->metaindex_handle = footer.metaindex_handle(); + rep->index_block = index_block; + rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0); + rep->filter_data = nullptr; + rep->filter = nullptr; + *table = new Table(rep); + (*table)->ReadMeta(footer); + } + + return s; +} + +void Table::ReadMeta(const Footer& footer) { + if (rep_->options.filter_policy == nullptr) { + return; // Do not need any metadata + } + + // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates + // it is an empty block. + ReadOptions opt; + if (rep_->options.paranoid_checks) { + opt.verify_checksums = true; + } + BlockContents contents; + if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) { + // Do not propagate errors since meta info is not needed for operation + return; + } + Block* meta = new Block(contents); + + Iterator* iter = meta->NewIterator(BytewiseComparator()); + std::string key = "filter."; + key.append(rep_->options.filter_policy->Name()); + iter->Seek(key); + if (iter->Valid() && iter->key() == Slice(key)) { + ReadFilter(iter->value()); + } + delete iter; + delete meta; +} + +void Table::ReadFilter(const Slice& filter_handle_value) { + Slice v = filter_handle_value; + BlockHandle filter_handle; + if (!filter_handle.DecodeFrom(&v).ok()) { + return; + } + + // We might want to unify with ReadBlock() if we start + // requiring checksum verification in Table::Open. + ReadOptions opt; + if (rep_->options.paranoid_checks) { + opt.verify_checksums = true; + } + BlockContents block; + if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) { + return; + } + if (block.heap_allocated) { + rep_->filter_data = block.data.data(); // Will need to delete later + } + rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data); +} + +Table::~Table() { delete rep_; } + +static void DeleteBlock(void* arg, void* ignored) { + delete reinterpret_cast<Block*>(arg); +} + +static void DeleteCachedBlock(const Slice& key, void* value) { + Block* block = reinterpret_cast<Block*>(value); + delete block; +} + +static void ReleaseBlock(void* arg, void* h) { + Cache* cache = reinterpret_cast<Cache*>(arg); + Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h); + cache->Release(handle); +} + +// Convert an index iterator value (i.e., an encoded BlockHandle) +// into an iterator over the contents of the corresponding block. +Iterator* Table::BlockReader(void* arg, const ReadOptions& options, + const Slice& index_value) { + Table* table = reinterpret_cast<Table*>(arg); + Cache* block_cache = table->rep_->options.block_cache; + Block* block = nullptr; + Cache::Handle* cache_handle = nullptr; + + BlockHandle handle; + Slice input = index_value; + Status s = handle.DecodeFrom(&input); + // We intentionally allow extra stuff in index_value so that we + // can add more features in the future. + + if (s.ok()) { + BlockContents contents; + if (block_cache != nullptr) { + char cache_key_buffer[16]; + EncodeFixed64(cache_key_buffer, table->rep_->cache_id); + EncodeFixed64(cache_key_buffer + 8, handle.offset()); + Slice key(cache_key_buffer, sizeof(cache_key_buffer)); + cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + block = reinterpret_cast<Block*>(block_cache->Value(cache_handle)); + } else { + s = ReadBlock(table->rep_->file, options, handle, &contents); + if (s.ok()) { + block = new Block(contents); + if (contents.cachable && options.fill_cache) { + cache_handle = block_cache->Insert(key, block, block->size(), + &DeleteCachedBlock); + } + } + } + } else { + s = ReadBlock(table->rep_->file, options, handle, &contents); + if (s.ok()) { + block = new Block(contents); + } + } + } + + Iterator* iter; + if (block != nullptr) { + iter = block->NewIterator(table->rep_->options.comparator); + if (cache_handle == nullptr) { + iter->RegisterCleanup(&DeleteBlock, block, nullptr); + } else { + iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); + } + } else { + iter = NewErrorIterator(s); + } + return iter; +} + +Iterator* Table::NewIterator(const ReadOptions& options) const { + return NewTwoLevelIterator( + rep_->index_block->NewIterator(rep_->options.comparator), + &Table::BlockReader, const_cast<Table*>(this), options); +} + +Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, + void (*handle_result)(void*, const Slice&, + const Slice&)) { + Status s; + Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); + iiter->Seek(k); + if (iiter->Valid()) { + Slice handle_value = iiter->value(); + FilterBlockReader* filter = rep_->filter; + BlockHandle handle; + if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() && + !filter->KeyMayMatch(handle.offset(), k)) { + // Not found + } else { + Iterator* block_iter = BlockReader(this, options, iiter->value()); + block_iter->Seek(k); + if (block_iter->Valid()) { + (*handle_result)(arg, block_iter->key(), block_iter->value()); + } + s = block_iter->status(); + delete block_iter; + } + } + if (s.ok()) { + s = iiter->status(); + } + delete iiter; + return s; +} + +uint64_t Table::ApproximateOffsetOf(const Slice& key) const { + Iterator* index_iter = + rep_->index_block->NewIterator(rep_->options.comparator); + index_iter->Seek(key); + uint64_t result; + if (index_iter->Valid()) { + BlockHandle handle; + Slice input = index_iter->value(); + Status s = handle.DecodeFrom(&input); + if (s.ok()) { + result = handle.offset(); + } else { + // Strange: we can't decode the block handle in the index block. + // We'll just return the offset of the metaindex block, which is + // close to the whole file size for this case. + result = rep_->metaindex_handle.offset(); + } + } else { + // key is past the last key in the file. Approximate the offset + // by returning the offset of the metaindex block (which is + // right near the end of the file). + result = rep_->metaindex_handle.offset(); + } + delete index_iter; + return result; +} + +} // namespace leveldb diff --git a/src/leveldb/table/table_builder.cc b/src/leveldb/table/table_builder.cc new file mode 100644 index 0000000000..278febf94f --- /dev/null +++ b/src/leveldb/table/table_builder.cc @@ -0,0 +1,265 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/table_builder.h" + +#include <assert.h> + +#include "leveldb/comparator.h" +#include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/options.h" +#include "table/block_builder.h" +#include "table/filter_block.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/crc32c.h" + +namespace leveldb { + +struct TableBuilder::Rep { + Rep(const Options& opt, WritableFile* f) + : options(opt), + index_block_options(opt), + file(f), + offset(0), + data_block(&options), + index_block(&index_block_options), + num_entries(0), + closed(false), + filter_block(opt.filter_policy == nullptr + ? nullptr + : new FilterBlockBuilder(opt.filter_policy)), + pending_index_entry(false) { + index_block_options.block_restart_interval = 1; + } + + Options options; + Options index_block_options; + WritableFile* file; + uint64_t offset; + Status status; + BlockBuilder data_block; + BlockBuilder index_block; + std::string last_key; + int64_t num_entries; + bool closed; // Either Finish() or Abandon() has been called. + FilterBlockBuilder* filter_block; + + // We do not emit the index entry for a block until we have seen the + // first key for the next data block. This allows us to use shorter + // keys in the index block. For example, consider a block boundary + // between the keys "the quick brown fox" and "the who". We can use + // "the r" as the key for the index block entry since it is >= all + // entries in the first block and < all entries in subsequent + // blocks. + // + // Invariant: r->pending_index_entry is true only if data_block is empty. + bool pending_index_entry; + BlockHandle pending_handle; // Handle to add to index block + + std::string compressed_output; +}; + +TableBuilder::TableBuilder(const Options& options, WritableFile* file) + : rep_(new Rep(options, file)) { + if (rep_->filter_block != nullptr) { + rep_->filter_block->StartBlock(0); + } +} + +TableBuilder::~TableBuilder() { + assert(rep_->closed); // Catch errors where caller forgot to call Finish() + delete rep_->filter_block; + delete rep_; +} + +Status TableBuilder::ChangeOptions(const Options& options) { + // Note: if more fields are added to Options, update + // this function to catch changes that should not be allowed to + // change in the middle of building a Table. + if (options.comparator != rep_->options.comparator) { + return Status::InvalidArgument("changing comparator while building table"); + } + + // Note that any live BlockBuilders point to rep_->options and therefore + // will automatically pick up the updated options. + rep_->options = options; + rep_->index_block_options = options; + rep_->index_block_options.block_restart_interval = 1; + return Status::OK(); +} + +void TableBuilder::Add(const Slice& key, const Slice& value) { + Rep* r = rep_; + assert(!r->closed); + if (!ok()) return; + if (r->num_entries > 0) { + assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); + } + + if (r->pending_index_entry) { + assert(r->data_block.empty()); + r->options.comparator->FindShortestSeparator(&r->last_key, key); + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->pending_index_entry = false; + } + + if (r->filter_block != nullptr) { + r->filter_block->AddKey(key); + } + + r->last_key.assign(key.data(), key.size()); + r->num_entries++; + r->data_block.Add(key, value); + + const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); + if (estimated_block_size >= r->options.block_size) { + Flush(); + } +} + +void TableBuilder::Flush() { + Rep* r = rep_; + assert(!r->closed); + if (!ok()) return; + if (r->data_block.empty()) return; + assert(!r->pending_index_entry); + WriteBlock(&r->data_block, &r->pending_handle); + if (ok()) { + r->pending_index_entry = true; + r->status = r->file->Flush(); + } + if (r->filter_block != nullptr) { + r->filter_block->StartBlock(r->offset); + } +} + +void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { + // File format contains a sequence of blocks where each block has: + // block_data: uint8[n] + // type: uint8 + // crc: uint32 + assert(ok()); + Rep* r = rep_; + Slice raw = block->Finish(); + + Slice block_contents; + CompressionType type = r->options.compression; + // TODO(postrelease): Support more compression options: zlib? + switch (type) { + case kNoCompression: + block_contents = raw; + break; + + case kSnappyCompression: { + std::string* compressed = &r->compressed_output; + if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && + compressed->size() < raw.size() - (raw.size() / 8u)) { + block_contents = *compressed; + } else { + // Snappy not supported, or compressed less than 12.5%, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + } + } + WriteRawBlock(block_contents, type, handle); + r->compressed_output.clear(); + block->Reset(); +} + +void TableBuilder::WriteRawBlock(const Slice& block_contents, + CompressionType type, BlockHandle* handle) { + Rep* r = rep_; + handle->set_offset(r->offset); + handle->set_size(block_contents.size()); + r->status = r->file->Append(block_contents); + if (r->status.ok()) { + char trailer[kBlockTrailerSize]; + trailer[0] = type; + uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); + crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type + EncodeFixed32(trailer + 1, crc32c::Mask(crc)); + r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (r->status.ok()) { + r->offset += block_contents.size() + kBlockTrailerSize; + } + } +} + +Status TableBuilder::status() const { return rep_->status; } + +Status TableBuilder::Finish() { + Rep* r = rep_; + Flush(); + assert(!r->closed); + r->closed = true; + + BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; + + // Write filter block + if (ok() && r->filter_block != nullptr) { + WriteRawBlock(r->filter_block->Finish(), kNoCompression, + &filter_block_handle); + } + + // Write metaindex block + if (ok()) { + BlockBuilder meta_index_block(&r->options); + if (r->filter_block != nullptr) { + // Add mapping from "filter.Name" to location of filter data + std::string key = "filter."; + key.append(r->options.filter_policy->Name()); + std::string handle_encoding; + filter_block_handle.EncodeTo(&handle_encoding); + meta_index_block.Add(key, handle_encoding); + } + + // TODO(postrelease): Add stats and other meta blocks + WriteBlock(&meta_index_block, &metaindex_block_handle); + } + + // Write index block + if (ok()) { + if (r->pending_index_entry) { + r->options.comparator->FindShortSuccessor(&r->last_key); + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->pending_index_entry = false; + } + WriteBlock(&r->index_block, &index_block_handle); + } + + // Write footer + if (ok()) { + Footer footer; + footer.set_metaindex_handle(metaindex_block_handle); + footer.set_index_handle(index_block_handle); + std::string footer_encoding; + footer.EncodeTo(&footer_encoding); + r->status = r->file->Append(footer_encoding); + if (r->status.ok()) { + r->offset += footer_encoding.size(); + } + } + return r->status; +} + +void TableBuilder::Abandon() { + Rep* r = rep_; + assert(!r->closed); + r->closed = true; +} + +uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; } + +uint64_t TableBuilder::FileSize() const { return rep_->offset; } + +} // namespace leveldb diff --git a/src/leveldb/table/table_test.cc b/src/leveldb/table/table_test.cc new file mode 100644 index 0000000000..17aaea2f9e --- /dev/null +++ b/src/leveldb/table/table_test.cc @@ -0,0 +1,837 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/table.h" + +#include <map> +#include <string> + +#include "db/dbformat.h" +#include "db/memtable.h" +#include "db/write_batch_internal.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "leveldb/table_builder.h" +#include "table/block.h" +#include "table/block_builder.h" +#include "table/format.h" +#include "util/random.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +// Return reverse of "key". +// Used to test non-lexicographic comparators. +static std::string Reverse(const Slice& key) { + std::string str(key.ToString()); + std::string rev(""); + for (std::string::reverse_iterator rit = str.rbegin(); rit != str.rend(); + ++rit) { + rev.push_back(*rit); + } + return rev; +} + +namespace { +class ReverseKeyComparator : public Comparator { + public: + const char* Name() const override { + return "leveldb.ReverseBytewiseComparator"; + } + + int Compare(const Slice& a, const Slice& b) const override { + return BytewiseComparator()->Compare(Reverse(a), Reverse(b)); + } + + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + std::string s = Reverse(*start); + std::string l = Reverse(limit); + BytewiseComparator()->FindShortestSeparator(&s, l); + *start = Reverse(s); + } + + void FindShortSuccessor(std::string* key) const override { + std::string s = Reverse(*key); + BytewiseComparator()->FindShortSuccessor(&s); + *key = Reverse(s); + } +}; +} // namespace +static ReverseKeyComparator reverse_key_comparator; + +static void Increment(const Comparator* cmp, std::string* key) { + if (cmp == BytewiseComparator()) { + key->push_back('\0'); + } else { + assert(cmp == &reverse_key_comparator); + std::string rev = Reverse(*key); + rev.push_back('\0'); + *key = Reverse(rev); + } +} + +// An STL comparator that uses a Comparator +namespace { +struct STLLessThan { + const Comparator* cmp; + + STLLessThan() : cmp(BytewiseComparator()) {} + STLLessThan(const Comparator* c) : cmp(c) {} + bool operator()(const std::string& a, const std::string& b) const { + return cmp->Compare(Slice(a), Slice(b)) < 0; + } +}; +} // namespace + +class StringSink : public WritableFile { + public: + ~StringSink() override = default; + + const std::string& contents() const { return contents_; } + + Status Close() override { return Status::OK(); } + Status Flush() override { return Status::OK(); } + Status Sync() override { return Status::OK(); } + + Status Append(const Slice& data) override { + contents_.append(data.data(), data.size()); + return Status::OK(); + } + + std::string GetName() const override { return ""; } + private: + std::string contents_; +}; + +class StringSource : public RandomAccessFile { + public: + StringSource(const Slice& contents) + : contents_(contents.data(), contents.size()) {} + + ~StringSource() override = default; + + uint64_t Size() const { return contents_.size(); } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + if (offset >= contents_.size()) { + return Status::InvalidArgument("invalid Read offset"); + } + if (offset + n > contents_.size()) { + n = contents_.size() - offset; + } + memcpy(scratch, &contents_[offset], n); + *result = Slice(scratch, n); + return Status::OK(); + } + + std::string GetName() const { return ""; } + private: + std::string contents_; +}; + +typedef std::map<std::string, std::string, STLLessThan> KVMap; + +// Helper class for tests to unify the interface between +// BlockBuilder/TableBuilder and Block/Table. +class Constructor { + public: + explicit Constructor(const Comparator* cmp) : data_(STLLessThan(cmp)) {} + virtual ~Constructor() = default; + + void Add(const std::string& key, const Slice& value) { + data_[key] = value.ToString(); + } + + // Finish constructing the data structure with all the keys that have + // been added so far. Returns the keys in sorted order in "*keys" + // and stores the key/value pairs in "*kvmap" + void Finish(const Options& options, std::vector<std::string>* keys, + KVMap* kvmap) { + *kvmap = data_; + keys->clear(); + for (const auto& kvp : data_) { + keys->push_back(kvp.first); + } + data_.clear(); + Status s = FinishImpl(options, *kvmap); + ASSERT_TRUE(s.ok()) << s.ToString(); + } + + // Construct the data structure from the data in "data" + virtual Status FinishImpl(const Options& options, const KVMap& data) = 0; + + virtual Iterator* NewIterator() const = 0; + + const KVMap& data() const { return data_; } + + virtual DB* db() const { return nullptr; } // Overridden in DBConstructor + + private: + KVMap data_; +}; + +class BlockConstructor : public Constructor { + public: + explicit BlockConstructor(const Comparator* cmp) + : Constructor(cmp), comparator_(cmp), block_(nullptr) {} + ~BlockConstructor() override { delete block_; } + Status FinishImpl(const Options& options, const KVMap& data) override { + delete block_; + block_ = nullptr; + BlockBuilder builder(&options); + + for (const auto& kvp : data) { + builder.Add(kvp.first, kvp.second); + } + // Open the block + data_ = builder.Finish().ToString(); + BlockContents contents; + contents.data = data_; + contents.cachable = false; + contents.heap_allocated = false; + block_ = new Block(contents); + return Status::OK(); + } + Iterator* NewIterator() const override { + return block_->NewIterator(comparator_); + } + + private: + const Comparator* const comparator_; + std::string data_; + Block* block_; + + BlockConstructor(); +}; + +class TableConstructor : public Constructor { + public: + TableConstructor(const Comparator* cmp) + : Constructor(cmp), source_(nullptr), table_(nullptr) {} + ~TableConstructor() override { Reset(); } + Status FinishImpl(const Options& options, const KVMap& data) override { + Reset(); + StringSink sink; + TableBuilder builder(options, &sink); + + for (const auto& kvp : data) { + builder.Add(kvp.first, kvp.second); + ASSERT_TRUE(builder.status().ok()); + } + Status s = builder.Finish(); + ASSERT_TRUE(s.ok()) << s.ToString(); + + ASSERT_EQ(sink.contents().size(), builder.FileSize()); + + // Open the table + source_ = new StringSource(sink.contents()); + Options table_options; + table_options.comparator = options.comparator; + return Table::Open(table_options, source_, sink.contents().size(), &table_); + } + + Iterator* NewIterator() const override { + return table_->NewIterator(ReadOptions()); + } + + uint64_t ApproximateOffsetOf(const Slice& key) const { + return table_->ApproximateOffsetOf(key); + } + + private: + void Reset() { + delete table_; + delete source_; + table_ = nullptr; + source_ = nullptr; + } + + StringSource* source_; + Table* table_; + + TableConstructor(); +}; + +// A helper class that converts internal format keys into user keys +class KeyConvertingIterator : public Iterator { + public: + explicit KeyConvertingIterator(Iterator* iter) : iter_(iter) {} + + KeyConvertingIterator(const KeyConvertingIterator&) = delete; + KeyConvertingIterator& operator=(const KeyConvertingIterator&) = delete; + + ~KeyConvertingIterator() override { delete iter_; } + + bool Valid() const override { return iter_->Valid(); } + void Seek(const Slice& target) override { + ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue); + std::string encoded; + AppendInternalKey(&encoded, ikey); + iter_->Seek(encoded); + } + void SeekToFirst() override { iter_->SeekToFirst(); } + void SeekToLast() override { iter_->SeekToLast(); } + void Next() override { iter_->Next(); } + void Prev() override { iter_->Prev(); } + + Slice key() const override { + assert(Valid()); + ParsedInternalKey key; + if (!ParseInternalKey(iter_->key(), &key)) { + status_ = Status::Corruption("malformed internal key"); + return Slice("corrupted key"); + } + return key.user_key; + } + + Slice value() const override { return iter_->value(); } + Status status() const override { + return status_.ok() ? iter_->status() : status_; + } + + private: + mutable Status status_; + Iterator* iter_; +}; + +class MemTableConstructor : public Constructor { + public: + explicit MemTableConstructor(const Comparator* cmp) + : Constructor(cmp), internal_comparator_(cmp) { + memtable_ = new MemTable(internal_comparator_); + memtable_->Ref(); + } + ~MemTableConstructor() override { memtable_->Unref(); } + Status FinishImpl(const Options& options, const KVMap& data) override { + memtable_->Unref(); + memtable_ = new MemTable(internal_comparator_); + memtable_->Ref(); + int seq = 1; + for (const auto& kvp : data) { + memtable_->Add(seq, kTypeValue, kvp.first, kvp.second); + seq++; + } + return Status::OK(); + } + Iterator* NewIterator() const override { + return new KeyConvertingIterator(memtable_->NewIterator()); + } + + private: + const InternalKeyComparator internal_comparator_; + MemTable* memtable_; +}; + +class DBConstructor : public Constructor { + public: + explicit DBConstructor(const Comparator* cmp) + : Constructor(cmp), comparator_(cmp) { + db_ = nullptr; + NewDB(); + } + ~DBConstructor() override { delete db_; } + Status FinishImpl(const Options& options, const KVMap& data) override { + delete db_; + db_ = nullptr; + NewDB(); + for (const auto& kvp : data) { + WriteBatch batch; + batch.Put(kvp.first, kvp.second); + ASSERT_TRUE(db_->Write(WriteOptions(), &batch).ok()); + } + return Status::OK(); + } + Iterator* NewIterator() const override { + return db_->NewIterator(ReadOptions()); + } + + DB* db() const override { return db_; } + + private: + void NewDB() { + std::string name = test::TmpDir() + "/table_testdb"; + + Options options; + options.comparator = comparator_; + Status status = DestroyDB(name, options); + ASSERT_TRUE(status.ok()) << status.ToString(); + + options.create_if_missing = true; + options.error_if_exists = true; + options.write_buffer_size = 10000; // Something small to force merging + status = DB::Open(options, name, &db_); + ASSERT_TRUE(status.ok()) << status.ToString(); + } + + const Comparator* const comparator_; + DB* db_; +}; + +enum TestType { TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST }; + +struct TestArgs { + TestType type; + bool reverse_compare; + int restart_interval; +}; + +static const TestArgs kTestArgList[] = { + {TABLE_TEST, false, 16}, + {TABLE_TEST, false, 1}, + {TABLE_TEST, false, 1024}, + {TABLE_TEST, true, 16}, + {TABLE_TEST, true, 1}, + {TABLE_TEST, true, 1024}, + + {BLOCK_TEST, false, 16}, + {BLOCK_TEST, false, 1}, + {BLOCK_TEST, false, 1024}, + {BLOCK_TEST, true, 16}, + {BLOCK_TEST, true, 1}, + {BLOCK_TEST, true, 1024}, + + // Restart interval does not matter for memtables + {MEMTABLE_TEST, false, 16}, + {MEMTABLE_TEST, true, 16}, + + // Do not bother with restart interval variations for DB + {DB_TEST, false, 16}, + {DB_TEST, true, 16}, +}; +static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]); + +class Harness { + public: + Harness() : constructor_(nullptr) {} + + void Init(const TestArgs& args) { + delete constructor_; + constructor_ = nullptr; + options_ = Options(); + + options_.block_restart_interval = args.restart_interval; + // Use shorter block size for tests to exercise block boundary + // conditions more. + options_.block_size = 256; + if (args.reverse_compare) { + options_.comparator = &reverse_key_comparator; + } + switch (args.type) { + case TABLE_TEST: + constructor_ = new TableConstructor(options_.comparator); + break; + case BLOCK_TEST: + constructor_ = new BlockConstructor(options_.comparator); + break; + case MEMTABLE_TEST: + constructor_ = new MemTableConstructor(options_.comparator); + break; + case DB_TEST: + constructor_ = new DBConstructor(options_.comparator); + break; + } + } + + ~Harness() { delete constructor_; } + + void Add(const std::string& key, const std::string& value) { + constructor_->Add(key, value); + } + + void Test(Random* rnd) { + std::vector<std::string> keys; + KVMap data; + constructor_->Finish(options_, &keys, &data); + + TestForwardScan(keys, data); + TestBackwardScan(keys, data); + TestRandomAccess(rnd, keys, data); + } + + void TestForwardScan(const std::vector<std::string>& keys, + const KVMap& data) { + Iterator* iter = constructor_->NewIterator(); + ASSERT_TRUE(!iter->Valid()); + iter->SeekToFirst(); + for (KVMap::const_iterator model_iter = data.begin(); + model_iter != data.end(); ++model_iter) { + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + iter->Next(); + } + ASSERT_TRUE(!iter->Valid()); + delete iter; + } + + void TestBackwardScan(const std::vector<std::string>& keys, + const KVMap& data) { + Iterator* iter = constructor_->NewIterator(); + ASSERT_TRUE(!iter->Valid()); + iter->SeekToLast(); + for (KVMap::const_reverse_iterator model_iter = data.rbegin(); + model_iter != data.rend(); ++model_iter) { + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + iter->Prev(); + } + ASSERT_TRUE(!iter->Valid()); + delete iter; + } + + void TestRandomAccess(Random* rnd, const std::vector<std::string>& keys, + const KVMap& data) { + static const bool kVerbose = false; + Iterator* iter = constructor_->NewIterator(); + ASSERT_TRUE(!iter->Valid()); + KVMap::const_iterator model_iter = data.begin(); + if (kVerbose) fprintf(stderr, "---\n"); + for (int i = 0; i < 200; i++) { + const int toss = rnd->Uniform(5); + switch (toss) { + case 0: { + if (iter->Valid()) { + if (kVerbose) fprintf(stderr, "Next\n"); + iter->Next(); + ++model_iter; + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + } + break; + } + + case 1: { + if (kVerbose) fprintf(stderr, "SeekToFirst\n"); + iter->SeekToFirst(); + model_iter = data.begin(); + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + break; + } + + case 2: { + std::string key = PickRandomKey(rnd, keys); + model_iter = data.lower_bound(key); + if (kVerbose) + fprintf(stderr, "Seek '%s'\n", EscapeString(key).c_str()); + iter->Seek(Slice(key)); + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + break; + } + + case 3: { + if (iter->Valid()) { + if (kVerbose) fprintf(stderr, "Prev\n"); + iter->Prev(); + if (model_iter == data.begin()) { + model_iter = data.end(); // Wrap around to invalid value + } else { + --model_iter; + } + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + } + break; + } + + case 4: { + if (kVerbose) fprintf(stderr, "SeekToLast\n"); + iter->SeekToLast(); + if (keys.empty()) { + model_iter = data.end(); + } else { + std::string last = data.rbegin()->first; + model_iter = data.lower_bound(last); + } + ASSERT_EQ(ToString(data, model_iter), ToString(iter)); + break; + } + } + } + delete iter; + } + + std::string ToString(const KVMap& data, const KVMap::const_iterator& it) { + if (it == data.end()) { + return "END"; + } else { + return "'" + it->first + "->" + it->second + "'"; + } + } + + std::string ToString(const KVMap& data, + const KVMap::const_reverse_iterator& it) { + if (it == data.rend()) { + return "END"; + } else { + return "'" + it->first + "->" + it->second + "'"; + } + } + + std::string ToString(const Iterator* it) { + if (!it->Valid()) { + return "END"; + } else { + return "'" + it->key().ToString() + "->" + it->value().ToString() + "'"; + } + } + + std::string PickRandomKey(Random* rnd, const std::vector<std::string>& keys) { + if (keys.empty()) { + return "foo"; + } else { + const int index = rnd->Uniform(keys.size()); + std::string result = keys[index]; + switch (rnd->Uniform(3)) { + case 0: + // Return an existing key + break; + case 1: { + // Attempt to return something smaller than an existing key + if (!result.empty() && result[result.size() - 1] > '\0') { + result[result.size() - 1]--; + } + break; + } + case 2: { + // Return something larger than an existing key + Increment(options_.comparator, &result); + break; + } + } + return result; + } + } + + // Returns nullptr if not running against a DB + DB* db() const { return constructor_->db(); } + + private: + Options options_; + Constructor* constructor_; +}; + +// Test empty table/block. +TEST(Harness, Empty) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 1); + Test(&rnd); + } +} + +// Special test for a block with no restart entries. The C++ leveldb +// code never generates such blocks, but the Java version of leveldb +// seems to. +TEST(Harness, ZeroRestartPointsInBlock) { + char data[sizeof(uint32_t)]; + memset(data, 0, sizeof(data)); + BlockContents contents; + contents.data = Slice(data, sizeof(data)); + contents.cachable = false; + contents.heap_allocated = false; + Block block(contents); + Iterator* iter = block.NewIterator(BytewiseComparator()); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + iter->SeekToLast(); + ASSERT_TRUE(!iter->Valid()); + iter->Seek("foo"); + ASSERT_TRUE(!iter->Valid()); + delete iter; +} + +// Test the empty key +TEST(Harness, SimpleEmptyKey) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 1); + Add("", "v"); + Test(&rnd); + } +} + +TEST(Harness, SimpleSingle) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 2); + Add("abc", "v"); + Test(&rnd); + } +} + +TEST(Harness, SimpleMulti) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 3); + Add("abc", "v"); + Add("abcd", "v"); + Add("ac", "v2"); + Test(&rnd); + } +} + +TEST(Harness, SimpleSpecialKey) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 4); + Add("\xff\xff", "v3"); + Test(&rnd); + } +} + +TEST(Harness, Randomized) { + for (int i = 0; i < kNumTestArgs; i++) { + Init(kTestArgList[i]); + Random rnd(test::RandomSeed() + 5); + for (int num_entries = 0; num_entries < 2000; + num_entries += (num_entries < 50 ? 1 : 200)) { + if ((num_entries % 10) == 0) { + fprintf(stderr, "case %d of %d: num_entries = %d\n", (i + 1), + int(kNumTestArgs), num_entries); + } + for (int e = 0; e < num_entries; e++) { + std::string v; + Add(test::RandomKey(&rnd, rnd.Skewed(4)), + test::RandomString(&rnd, rnd.Skewed(5), &v).ToString()); + } + Test(&rnd); + } + } +} + +TEST(Harness, RandomizedLongDB) { + Random rnd(test::RandomSeed()); + TestArgs args = {DB_TEST, false, 16}; + Init(args); + int num_entries = 100000; + for (int e = 0; e < num_entries; e++) { + std::string v; + Add(test::RandomKey(&rnd, rnd.Skewed(4)), + test::RandomString(&rnd, rnd.Skewed(5), &v).ToString()); + } + Test(&rnd); + + // We must have created enough data to force merging + int files = 0; + for (int level = 0; level < config::kNumLevels; level++) { + std::string value; + char name[100]; + snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level); + ASSERT_TRUE(db()->GetProperty(name, &value)); + files += atoi(value.c_str()); + } + ASSERT_GT(files, 0); +} + +class MemTableTest {}; + +TEST(MemTableTest, Simple) { + InternalKeyComparator cmp(BytewiseComparator()); + MemTable* memtable = new MemTable(cmp); + memtable->Ref(); + WriteBatch batch; + WriteBatchInternal::SetSequence(&batch, 100); + batch.Put(std::string("k1"), std::string("v1")); + batch.Put(std::string("k2"), std::string("v2")); + batch.Put(std::string("k3"), std::string("v3")); + batch.Put(std::string("largekey"), std::string("vlarge")); + ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok()); + + Iterator* iter = memtable->NewIterator(); + iter->SeekToFirst(); + while (iter->Valid()) { + fprintf(stderr, "key: '%s' -> '%s'\n", iter->key().ToString().c_str(), + iter->value().ToString().c_str()); + iter->Next(); + } + + delete iter; + memtable->Unref(); +} + +static bool Between(uint64_t val, uint64_t low, uint64_t high) { + bool result = (val >= low) && (val <= high); + if (!result) { + fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n", + (unsigned long long)(val), (unsigned long long)(low), + (unsigned long long)(high)); + } + return result; +} + +class TableTest {}; + +TEST(TableTest, ApproximateOffsetOfPlain) { + TableConstructor c(BytewiseComparator()); + c.Add("k01", "hello"); + c.Add("k02", "hello2"); + c.Add("k03", std::string(10000, 'x')); + c.Add("k04", std::string(200000, 'x')); + c.Add("k05", std::string(300000, 'x')); + c.Add("k06", "hello3"); + c.Add("k07", std::string(100000, 'x')); + std::vector<std::string> keys; + KVMap kvmap; + Options options; + options.block_size = 1024; + options.compression = kNoCompression; + c.Finish(options, &keys, &kvmap); + + ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01a"), 0, 0)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 0, 0)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 10000, 11000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04a"), 210000, 211000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k05"), 210000, 211000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); +} + +static bool SnappyCompressionSupported() { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::Snappy_Compress(in.data(), in.size(), &out); +} + +TEST(TableTest, ApproximateOffsetOfCompressed) { + if (!SnappyCompressionSupported()) { + fprintf(stderr, "skipping compression tests\n"); + return; + } + + Random rnd(301); + TableConstructor c(BytewiseComparator()); + std::string tmp; + c.Add("k01", "hello"); + c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); + c.Add("k03", "hello3"); + c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); + std::vector<std::string> keys; + KVMap kvmap; + Options options; + options.block_size = 1024; + options.compression = kSnappyCompression; + c.Finish(options, &keys, &kvmap); + + // Expected upper and lower bounds of space used by compressible strings. + static const int kSlop = 1000; // Compressor effectiveness varies. + const int expected = 2500; // 10000 * compression ratio (0.25) + const int min_z = expected - kSlop; + const int max_z = expected + kSlop; + + ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, kSlop)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, kSlop)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, kSlop)); + // Have now emitted a large compressible string, so adjust expected offset. + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), min_z, max_z)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), min_z, max_z)); + // Have now emitted two large compressible strings, so adjust expected offset. + ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 2 * min_z, 2 * max_z)); +} + +} // namespace leveldb + +int main(int argc, char** argv) { return leveldb::test::RunAllTests(); } diff --git a/src/leveldb/table/two_level_iterator.cc b/src/leveldb/table/two_level_iterator.cc new file mode 100644 index 0000000000..144790dd97 --- /dev/null +++ b/src/leveldb/table/two_level_iterator.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/two_level_iterator.h" + +#include "leveldb/table.h" +#include "table/block.h" +#include "table/format.h" +#include "table/iterator_wrapper.h" + +namespace leveldb { + +namespace { + +typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&); + +class TwoLevelIterator : public Iterator { + public: + TwoLevelIterator(Iterator* index_iter, BlockFunction block_function, + void* arg, const ReadOptions& options); + + ~TwoLevelIterator() override; + + void Seek(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + void Next() override; + void Prev() override; + + bool Valid() const override { return data_iter_.Valid(); } + Slice key() const override { + assert(Valid()); + return data_iter_.key(); + } + Slice value() const override { + assert(Valid()); + return data_iter_.value(); + } + Status status() const override { + // It'd be nice if status() returned a const Status& instead of a Status + if (!index_iter_.status().ok()) { + return index_iter_.status(); + } else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) { + return data_iter_.status(); + } else { + return status_; + } + } + + private: + void SaveError(const Status& s) { + if (status_.ok() && !s.ok()) status_ = s; + } + void SkipEmptyDataBlocksForward(); + void SkipEmptyDataBlocksBackward(); + void SetDataIterator(Iterator* data_iter); + void InitDataBlock(); + + BlockFunction block_function_; + void* arg_; + const ReadOptions options_; + Status status_; + IteratorWrapper index_iter_; + IteratorWrapper data_iter_; // May be nullptr + // If data_iter_ is non-null, then "data_block_handle_" holds the + // "index_value" passed to block_function_ to create the data_iter_. + std::string data_block_handle_; +}; + +TwoLevelIterator::TwoLevelIterator(Iterator* index_iter, + BlockFunction block_function, void* arg, + const ReadOptions& options) + : block_function_(block_function), + arg_(arg), + options_(options), + index_iter_(index_iter), + data_iter_(nullptr) {} + +TwoLevelIterator::~TwoLevelIterator() = default; + +void TwoLevelIterator::Seek(const Slice& target) { + index_iter_.Seek(target); + InitDataBlock(); + if (data_iter_.iter() != nullptr) data_iter_.Seek(target); + SkipEmptyDataBlocksForward(); +} + +void TwoLevelIterator::SeekToFirst() { + index_iter_.SeekToFirst(); + InitDataBlock(); + if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst(); + SkipEmptyDataBlocksForward(); +} + +void TwoLevelIterator::SeekToLast() { + index_iter_.SeekToLast(); + InitDataBlock(); + if (data_iter_.iter() != nullptr) data_iter_.SeekToLast(); + SkipEmptyDataBlocksBackward(); +} + +void TwoLevelIterator::Next() { + assert(Valid()); + data_iter_.Next(); + SkipEmptyDataBlocksForward(); +} + +void TwoLevelIterator::Prev() { + assert(Valid()); + data_iter_.Prev(); + SkipEmptyDataBlocksBackward(); +} + +void TwoLevelIterator::SkipEmptyDataBlocksForward() { + while (data_iter_.iter() == nullptr || !data_iter_.Valid()) { + // Move to next block + if (!index_iter_.Valid()) { + SetDataIterator(nullptr); + return; + } + index_iter_.Next(); + InitDataBlock(); + if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst(); + } +} + +void TwoLevelIterator::SkipEmptyDataBlocksBackward() { + while (data_iter_.iter() == nullptr || !data_iter_.Valid()) { + // Move to next block + if (!index_iter_.Valid()) { + SetDataIterator(nullptr); + return; + } + index_iter_.Prev(); + InitDataBlock(); + if (data_iter_.iter() != nullptr) data_iter_.SeekToLast(); + } +} + +void TwoLevelIterator::SetDataIterator(Iterator* data_iter) { + if (data_iter_.iter() != nullptr) SaveError(data_iter_.status()); + data_iter_.Set(data_iter); +} + +void TwoLevelIterator::InitDataBlock() { + if (!index_iter_.Valid()) { + SetDataIterator(nullptr); + } else { + Slice handle = index_iter_.value(); + if (data_iter_.iter() != nullptr && + handle.compare(data_block_handle_) == 0) { + // data_iter_ is already constructed with this iterator, so + // no need to change anything + } else { + Iterator* iter = (*block_function_)(arg_, options_, handle); + data_block_handle_.assign(handle.data(), handle.size()); + SetDataIterator(iter); + } + } +} + +} // namespace + +Iterator* NewTwoLevelIterator(Iterator* index_iter, + BlockFunction block_function, void* arg, + const ReadOptions& options) { + return new TwoLevelIterator(index_iter, block_function, arg, options); +} + +} // namespace leveldb diff --git a/src/leveldb/table/two_level_iterator.h b/src/leveldb/table/two_level_iterator.h new file mode 100644 index 0000000000..81ffe809ac --- /dev/null +++ b/src/leveldb/table/two_level_iterator.h @@ -0,0 +1,31 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_ +#define STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_ + +#include "leveldb/iterator.h" + +namespace leveldb { + +struct ReadOptions; + +// Return a new two level iterator. A two-level iterator contains an +// index iterator whose values point to a sequence of blocks where +// each block is itself a sequence of key,value pairs. The returned +// two-level iterator yields the concatenation of all key/value pairs +// in the sequence of blocks. Takes ownership of "index_iter" and +// will delete it when no longer needed. +// +// Uses a supplied function to convert an index_iter value into +// an iterator over the contents of the corresponding block. +Iterator* NewTwoLevelIterator( + Iterator* index_iter, + Iterator* (*block_function)(void* arg, const ReadOptions& options, + const Slice& index_value), + void* arg, const ReadOptions& options); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_TABLE_TWO_LEVEL_ITERATOR_H_ |