aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb/util')
-rw-r--r--src/leveldb/util/arena.cc6
-rw-r--r--src/leveldb/util/arena.h10
-rw-r--r--src/leveldb/util/bloom.cc2
-rw-r--r--src/leveldb/util/bloom_test.cc3
-rw-r--r--src/leveldb/util/cache.cc136
-rw-r--r--src/leveldb/util/cache_test.cc42
-rw-r--r--src/leveldb/util/env.cc4
-rw-r--r--src/leveldb/util/env_posix.cc13
-rw-r--r--src/leveldb/util/env_win.cc37
-rw-r--r--src/leveldb/util/options.cc2
-rw-r--r--src/leveldb/util/testutil.h10
11 files changed, 218 insertions, 47 deletions
diff --git a/src/leveldb/util/arena.cc b/src/leveldb/util/arena.cc
index 9367f71492..74078213ee 100644
--- a/src/leveldb/util/arena.cc
+++ b/src/leveldb/util/arena.cc
@@ -9,8 +9,7 @@ namespace leveldb {
static const int kBlockSize = 4096;
-Arena::Arena() {
- blocks_memory_ = 0;
+Arena::Arena() : memory_usage_(0) {
alloc_ptr_ = NULL; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
}
@@ -60,8 +59,9 @@ char* Arena::AllocateAligned(size_t bytes) {
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
- blocks_memory_ += block_bytes;
blocks_.push_back(result);
+ memory_usage_.NoBarrier_Store(
+ reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
return result;
}
diff --git a/src/leveldb/util/arena.h b/src/leveldb/util/arena.h
index 73bbf1cb9b..48bab33741 100644
--- a/src/leveldb/util/arena.h
+++ b/src/leveldb/util/arena.h
@@ -9,6 +9,7 @@
#include <assert.h>
#include <stddef.h>
#include <stdint.h>
+#include "port/port.h"
namespace leveldb {
@@ -24,10 +25,9 @@ class Arena {
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
- // by the arena (including space allocated but not yet used for user
- // allocations).
+ // by the arena.
size_t MemoryUsage() const {
- return blocks_memory_ + blocks_.capacity() * sizeof(char*);
+ return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
private:
@@ -41,8 +41,8 @@ class Arena {
// Array of new[] allocated memory blocks
std::vector<char*> blocks_;
- // Bytes of memory in blocks allocated so far
- size_t blocks_memory_;
+ // Total memory usage of the arena.
+ port::AtomicPointer memory_usage_;
// No copying allowed
Arena(const Arena&);
diff --git a/src/leveldb/util/bloom.cc b/src/leveldb/util/bloom.cc
index a27a2ace28..bf3e4ca6e9 100644
--- a/src/leveldb/util/bloom.cc
+++ b/src/leveldb/util/bloom.cc
@@ -47,7 +47,7 @@ class BloomFilterPolicy : public FilterPolicy {
dst->resize(init_size + bytes, 0);
dst->push_back(static_cast<char>(k_)); // Remember # of probes in filter
char* array = &(*dst)[init_size];
- for (size_t i = 0; i < n; i++) {
+ for (int i = 0; i < n; i++) {
// Use double-hashing to generate a sequence of hash values.
// See analysis in [Kirsch,Mitzenmacher 2006].
uint32_t h = BloomHash(keys[i]);
diff --git a/src/leveldb/util/bloom_test.cc b/src/leveldb/util/bloom_test.cc
index 77fb1b3159..1b87a2be3f 100644
--- a/src/leveldb/util/bloom_test.cc
+++ b/src/leveldb/util/bloom_test.cc
@@ -46,7 +46,8 @@ class BloomTest {
key_slices.push_back(Slice(keys_[i]));
}
filter_.clear();
- policy_->CreateFilter(&key_slices[0], key_slices.size(), &filter_);
+ policy_->CreateFilter(&key_slices[0], static_cast<int>(key_slices.size()),
+ &filter_);
keys_.clear();
if (kVerbose >= 2) DumpFilter();
}
diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc
index 8b197bc02a..ce46886171 100644
--- a/src/leveldb/util/cache.cc
+++ b/src/leveldb/util/cache.cc
@@ -19,6 +19,23 @@ Cache::~Cache() {
namespace {
// LRU cache implementation
+//
+// Cache entries have an "in_cache" boolean indicating whether the cache has a
+// reference on the entry. The only ways that this can become false without the
+// entry being passed to its "deleter" are via Erase(), via Insert() when
+// an element with a duplicate key is inserted, or on destruction of the cache.
+//
+// The cache keeps two linked lists of items in the cache. All items in the
+// cache are in one list or the other, and never both. Items still referenced
+// by clients but erased from the cache are in neither list. The lists are:
+// - in-use: contains the items currently referenced by clients, in no
+// particular order. (This list is used for invariant checking. If we
+// removed the check, elements that would otherwise be on this list could be
+// left as disconnected singleton lists.)
+// - LRU: contains the items not currently referenced by clients, in LRU order
+// Elements are moved between these lists by the Ref() and Unref() methods,
+// when they detect an element in the cache acquiring or losing its only
+// external reference.
// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
@@ -30,7 +47,8 @@ struct LRUHandle {
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
- uint32_t refs;
+ bool in_cache; // Whether entry is in the cache.
+ uint32_t refs; // References, including cache reference, if present.
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key
@@ -147,49 +165,77 @@ class LRUCache {
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
+ void Prune();
+ size_t TotalCharge() const {
+ MutexLock l(&mutex_);
+ return usage_;
+ }
private:
void LRU_Remove(LRUHandle* e);
- void LRU_Append(LRUHandle* e);
+ void LRU_Append(LRUHandle*list, LRUHandle* e);
+ void Ref(LRUHandle* e);
void Unref(LRUHandle* e);
+ bool FinishErase(LRUHandle* e);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
- port::Mutex mutex_;
+ mutable port::Mutex mutex_;
size_t usage_;
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
+ // Entries have refs==1 and in_cache==true.
LRUHandle lru_;
+ // Dummy head of in-use list.
+ // Entries are in use by clients, and have refs >= 2 and in_cache==true.
+ LRUHandle in_use_;
+
HandleTable table_;
};
LRUCache::LRUCache()
: usage_(0) {
- // Make empty circular linked list
+ // Make empty circular linked lists.
lru_.next = &lru_;
lru_.prev = &lru_;
+ in_use_.next = &in_use_;
+ in_use_.prev = &in_use_;
}
LRUCache::~LRUCache() {
+ assert(in_use_.next == &in_use_); // Error if caller has an unreleased handle
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
- assert(e->refs == 1); // Error if caller has an unreleased handle
+ assert(e->in_cache);
+ e->in_cache = false;
+ assert(e->refs == 1); // Invariant of lru_ list.
Unref(e);
e = next;
}
}
+void LRUCache::Ref(LRUHandle* e) {
+ if (e->refs == 1 && e->in_cache) { // If on lru_ list, move to in_use_ list.
+ LRU_Remove(e);
+ LRU_Append(&in_use_, e);
+ }
+ e->refs++;
+}
+
void LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
- if (e->refs <= 0) {
- usage_ -= e->charge;
+ if (e->refs == 0) { // Deallocate.
+ assert(!e->in_cache);
(*e->deleter)(e->key(), e->value);
free(e);
+ } else if (e->in_cache && e->refs == 1) { // No longer in use; move to lru_ list.
+ LRU_Remove(e);
+ LRU_Append(&lru_, e);
}
}
@@ -198,10 +244,10 @@ void LRUCache::LRU_Remove(LRUHandle* e) {
e->prev->next = e->next;
}
-void LRUCache::LRU_Append(LRUHandle* e) {
- // Make "e" newest entry by inserting just before lru_
- e->next = &lru_;
- e->prev = lru_.prev;
+void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
+ // Make "e" newest entry by inserting just before *list
+ e->next = list;
+ e->prev = list->prev;
e->prev->next = e;
e->next->prev = e;
}
@@ -210,9 +256,7 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != NULL) {
- e->refs++;
- LRU_Remove(e);
- LRU_Append(e);
+ Ref(e);
}
return reinterpret_cast<Cache::Handle*>(e);
}
@@ -234,34 +278,58 @@ Cache::Handle* LRUCache::Insert(
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
- e->refs = 2; // One from LRUCache, one for the returned handle
+ e->in_cache = false;
+ e->refs = 1; // for the returned handle.
memcpy(e->key_data, key.data(), key.size());
- LRU_Append(e);
- usage_ += charge;
- LRUHandle* old = table_.Insert(e);
- if (old != NULL) {
- LRU_Remove(old);
- Unref(old);
- }
+ if (capacity_ > 0) {
+ e->refs++; // for the cache's reference.
+ e->in_cache = true;
+ LRU_Append(&in_use_, e);
+ usage_ += charge;
+ FinishErase(table_.Insert(e));
+ } // else don't cache. (Tests use capacity_==0 to turn off caching.)
while (usage_ > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
- LRU_Remove(old);
- table_.Remove(old->key(), old->hash);
- Unref(old);
+ assert(old->refs == 1);
+ bool erased = FinishErase(table_.Remove(old->key(), old->hash));
+ if (!erased) { // to avoid unused variable when compiled NDEBUG
+ assert(erased);
+ }
}
return reinterpret_cast<Cache::Handle*>(e);
}
-void LRUCache::Erase(const Slice& key, uint32_t hash) {
- MutexLock l(&mutex_);
- LRUHandle* e = table_.Remove(key, hash);
+// If e != NULL, finish removing *e from the cache; it has already been removed
+// from the hash table. Return whether e != NULL. Requires mutex_ held.
+bool LRUCache::FinishErase(LRUHandle* e) {
if (e != NULL) {
+ assert(e->in_cache);
LRU_Remove(e);
+ e->in_cache = false;
+ usage_ -= e->charge;
Unref(e);
}
+ return e != NULL;
+}
+
+void LRUCache::Erase(const Slice& key, uint32_t hash) {
+ MutexLock l(&mutex_);
+ FinishErase(table_.Remove(key, hash));
+}
+
+void LRUCache::Prune() {
+ MutexLock l(&mutex_);
+ while (lru_.next != &lru_) {
+ LRUHandle* e = lru_.next;
+ assert(e->refs == 1);
+ bool erased = FinishErase(table_.Remove(e->key(), e->hash));
+ if (!erased) { // to avoid unused variable when compiled NDEBUG
+ assert(erased);
+ }
+ }
}
static const int kNumShardBits = 4;
@@ -314,6 +382,18 @@ class ShardedLRUCache : public Cache {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
+ virtual void Prune() {
+ for (int s = 0; s < kNumShards; s++) {
+ shard_[s].Prune();
+ }
+ }
+ virtual size_t TotalCharge() const {
+ size_t total = 0;
+ for (int s = 0; s < kNumShards; s++) {
+ total += shard_[s].TotalCharge();
+ }
+ return total;
+ }
};
} // end anonymous namespace
diff --git a/src/leveldb/util/cache_test.cc b/src/leveldb/util/cache_test.cc
index 43716715a8..468f7a6425 100644
--- a/src/leveldb/util/cache_test.cc
+++ b/src/leveldb/util/cache_test.cc
@@ -59,6 +59,11 @@ class CacheTest {
&CacheTest::Deleter));
}
+ Cache::Handle* InsertAndReturnHandle(int key, int value, int charge = 1) {
+ return cache_->Insert(EncodeKey(key), EncodeValue(value), charge,
+ &CacheTest::Deleter);
+ }
+
void Erase(int key) {
cache_->Erase(EncodeKey(key));
}
@@ -135,8 +140,11 @@ TEST(CacheTest, EntriesArePinned) {
TEST(CacheTest, EvictionPolicy) {
Insert(100, 101);
Insert(200, 201);
+ Insert(300, 301);
+ Cache::Handle* h = cache_->Lookup(EncodeKey(300));
- // Frequently used entry must be kept around
+ // Frequently used entry must be kept around,
+ // as must things that are still in use.
for (int i = 0; i < kCacheSize + 100; i++) {
Insert(1000+i, 2000+i);
ASSERT_EQ(2000+i, Lookup(1000+i));
@@ -144,6 +152,25 @@ TEST(CacheTest, EvictionPolicy) {
}
ASSERT_EQ(101, Lookup(100));
ASSERT_EQ(-1, Lookup(200));
+ ASSERT_EQ(301, Lookup(300));
+ cache_->Release(h);
+}
+
+TEST(CacheTest, UseExceedsCacheSize) {
+ // Overfill the cache, keeping handles on all inserted entries.
+ std::vector<Cache::Handle*> h;
+ for (int i = 0; i < kCacheSize + 100; i++) {
+ h.push_back(InsertAndReturnHandle(1000+i, 2000+i));
+ }
+
+ // Check that all the entries can be found in the cache.
+ for (int i = 0; i < h.size(); i++) {
+ ASSERT_EQ(2000+i, Lookup(1000+i));
+ }
+
+ for (int i = 0; i < h.size(); i++) {
+ cache_->Release(h[i]);
+ }
}
TEST(CacheTest, HeavyEntries) {
@@ -179,6 +206,19 @@ TEST(CacheTest, NewId) {
ASSERT_NE(a, b);
}
+TEST(CacheTest, Prune) {
+ Insert(1, 100);
+ Insert(2, 200);
+
+ Cache::Handle* handle = cache_->Lookup(EncodeKey(1));
+ ASSERT_TRUE(handle);
+ cache_->Prune();
+ cache_->Release(handle);
+
+ ASSERT_EQ(100, Lookup(1));
+ ASSERT_EQ(-1, Lookup(2));
+}
+
} // namespace leveldb
int main(int argc, char** argv) {
diff --git a/src/leveldb/util/env.cc b/src/leveldb/util/env.cc
index c2600e964a..c58a0821ef 100644
--- a/src/leveldb/util/env.cc
+++ b/src/leveldb/util/env.cc
@@ -9,6 +9,10 @@ namespace leveldb {
Env::~Env() {
}
+Status Env::NewAppendableFile(const std::string& fname, WritableFile** result) {
+ return Status::NotSupported("NewAppendableFile", fname);
+}
+
SequentialFile::~SequentialFile() {
}
diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc
index ba2667864a..e0fca52f46 100644
--- a/src/leveldb/util/env_posix.cc
+++ b/src/leveldb/util/env_posix.cc
@@ -351,6 +351,19 @@ class PosixEnv : public Env {
return s;
}
+ virtual Status NewAppendableFile(const std::string& fname,
+ WritableFile** result) {
+ Status s;
+ FILE* f = fopen(fname.c_str(), "a");
+ if (f == NULL) {
+ *result = NULL;
+ s = IOError(fname, errno);
+ } else {
+ *result = new PosixWritableFile(fname, f);
+ }
+ return s;
+ }
+
virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0;
}
diff --git a/src/leveldb/util/env_win.cc b/src/leveldb/util/env_win.cc
index e11a96b791..b074b7579e 100644
--- a/src/leveldb/util/env_win.cc
+++ b/src/leveldb/util/env_win.cc
@@ -106,7 +106,7 @@ private:
class Win32WritableFile : public WritableFile
{
public:
- Win32WritableFile(const std::string& fname);
+ Win32WritableFile(const std::string& fname, bool append);
~Win32WritableFile();
virtual Status Append(const Slice& data);
@@ -158,6 +158,8 @@ public:
RandomAccessFile** result);
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result);
+ virtual Status NewAppendableFile(const std::string& fname,
+ WritableFile** result);
virtual bool FileExists(const std::string& fname);
@@ -423,17 +425,23 @@ void Win32RandomAccessFile::_CleanUp()
}
}
-Win32WritableFile::Win32WritableFile(const std::string& fname)
+Win32WritableFile::Win32WritableFile(const std::string& fname, bool append)
: filename_(fname)
{
std::wstring path;
ToWidePath(fname, path);
- DWORD Flag = PathFileExistsW(path.c_str()) ? OPEN_EXISTING : CREATE_ALWAYS;
+ // NewAppendableFile: append to an existing file, or create a new one
+ // if none exists - this is OPEN_ALWAYS behavior, with
+ // FILE_APPEND_DATA to avoid having to manually position the file
+ // pointer at the end of the file.
+ // NewWritableFile: create a new file, delete if it exists - this is
+ // CREATE_ALWAYS behavior. This file is used for writing only so
+ // use GENERIC_WRITE.
_hFile = CreateFileW(path.c_str(),
- GENERIC_READ | GENERIC_WRITE,
+ append ? FILE_APPEND_DATA : GENERIC_WRITE,
FILE_SHARE_READ|FILE_SHARE_DELETE|FILE_SHARE_WRITE,
NULL,
- Flag,
+ append ? OPEN_ALWAYS : CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL,
NULL);
// CreateFileW returns INVALID_HANDLE_VALUE in case of error, always check isEnable() before use
@@ -823,7 +831,9 @@ Status Win32Env::NewLogger( const std::string& fname, Logger** result )
{
Status sRet;
std::string path = fname;
- Win32WritableFile* pMapFile = new Win32WritableFile(ModifyPath(path));
+ // Logs are opened with write semantics, not with append semantics
+ // (see PosixEnv::NewLogger)
+ Win32WritableFile* pMapFile = new Win32WritableFile(ModifyPath(path), false);
if(!pMapFile->isEnable()){
delete pMapFile;
*result = NULL;
@@ -837,7 +847,20 @@ Status Win32Env::NewWritableFile( const std::string& fname, WritableFile** resul
{
Status sRet;
std::string path = fname;
- Win32WritableFile* pFile = new Win32WritableFile(ModifyPath(path));
+ Win32WritableFile* pFile = new Win32WritableFile(ModifyPath(path), false);
+ if(!pFile->isEnable()){
+ *result = NULL;
+ sRet = Status::IOError(fname,Win32::GetLastErrSz());
+ }else
+ *result = pFile;
+ return sRet;
+}
+
+Status Win32Env::NewAppendableFile( const std::string& fname, WritableFile** result )
+{
+ Status sRet;
+ std::string path = fname;
+ Win32WritableFile* pFile = new Win32WritableFile(ModifyPath(path), true);
if(!pFile->isEnable()){
*result = NULL;
sRet = Status::IOError(fname,Win32::GetLastErrSz());
diff --git a/src/leveldb/util/options.cc b/src/leveldb/util/options.cc
index 76af5b9302..8b618fb1ae 100644
--- a/src/leveldb/util/options.cc
+++ b/src/leveldb/util/options.cc
@@ -22,8 +22,8 @@ Options::Options()
block_size(4096),
block_restart_interval(16),
compression(kSnappyCompression),
+ reuse_logs(false),
filter_policy(NULL) {
}
-
} // namespace leveldb
diff --git a/src/leveldb/util/testutil.h b/src/leveldb/util/testutil.h
index adad3fc1ea..d7e4583702 100644
--- a/src/leveldb/util/testutil.h
+++ b/src/leveldb/util/testutil.h
@@ -45,6 +45,16 @@ class ErrorEnv : public EnvWrapper {
}
return target()->NewWritableFile(fname, result);
}
+
+ virtual Status NewAppendableFile(const std::string& fname,
+ WritableFile** result) {
+ if (writable_file_error_) {
+ ++num_writable_file_errors_;
+ *result = NULL;
+ return Status::IOError(fname, "fake error");
+ }
+ return target()->NewAppendableFile(fname, result);
+ }
};
} // namespace test