aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/leveldb/db')
-rw-r--r--src/leveldb/db/db_test.cc75
-rw-r--r--src/leveldb/db/version_set.cc36
-rw-r--r--src/leveldb/db/version_set.h2
3 files changed, 113 insertions, 0 deletions
diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc
index 74abd13e5c..684ea3bdbc 100644
--- a/src/leveldb/db/db_test.cc
+++ b/src/leveldb/db/db_test.cc
@@ -59,6 +59,12 @@ class SpecialEnv : public EnvWrapper {
// Simulate non-writable file system while this pointer is non-NULL
port::AtomicPointer non_writable_;
+ // Force sync of manifest files to fail while this pointer is non-NULL
+ port::AtomicPointer manifest_sync_error_;
+
+ // Force write to manifest files to fail while this pointer is non-NULL
+ port::AtomicPointer manifest_write_error_;
+
bool count_random_reads_;
AtomicCounter random_read_counter_;
@@ -69,6 +75,8 @@ class SpecialEnv : public EnvWrapper {
no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL);
count_random_reads_ = false;
+ manifest_sync_error_.Release_Store(NULL);
+ manifest_write_error_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@@ -100,6 +108,30 @@ class SpecialEnv : public EnvWrapper {
return base_->Sync();
}
};
+ class ManifestFile : public WritableFile {
+ private:
+ SpecialEnv* env_;
+ WritableFile* base_;
+ public:
+ ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
+ ~ManifestFile() { delete base_; }
+ Status Append(const Slice& data) {
+ if (env_->manifest_write_error_.Acquire_Load() != NULL) {
+ return Status::IOError("simulated writer error");
+ } else {
+ return base_->Append(data);
+ }
+ }
+ Status Close() { return base_->Close(); }
+ Status Flush() { return base_->Flush(); }
+ Status Sync() {
+ if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
+ return Status::IOError("simulated sync error");
+ } else {
+ return base_->Sync();
+ }
+ }
+ };
if (non_writable_.Acquire_Load() != NULL) {
return Status::IOError("simulated write error");
@@ -109,6 +141,8 @@ class SpecialEnv : public EnvWrapper {
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
+ } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
+ *r = new ManifestFile(this, *r);
}
}
return s;
@@ -1492,6 +1526,47 @@ TEST(DBTest, NonWritableFileSystem) {
env_->non_writable_.Release_Store(NULL);
}
+TEST(DBTest, ManifestWriteError) {
+ // Test for the following problem:
+ // (a) Compaction produces file F
+ // (b) Log record containing F is written to MANIFEST file, but Sync() fails
+ // (c) GC deletes F
+ // (d) After reopening DB, reads fail since deleted F is named in log record
+
+ // We iterate twice. In the second iteration, everything is the
+ // same except the log record never makes it to the MANIFEST file.
+ for (int iter = 0; iter < 2; iter++) {
+ port::AtomicPointer* error_type = (iter == 0)
+ ? &env_->manifest_sync_error_
+ : &env_->manifest_write_error_;
+
+ // Insert foo=>bar mapping
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.create_if_missing = true;
+ options.error_if_exists = false;
+ DestroyAndReopen(&options);
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Memtable compaction (will succeed)
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("bar", Get("foo"));
+ const int last = config::kMaxMemCompactLevel;
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
+
+ // Merging compaction (will fail)
+ error_type->Release_Store(env_);
+ dbfull()->TEST_CompactRange(last, NULL, NULL); // Should fail
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Recovery: should not lose data
+ error_type->Release_Store(NULL);
+ Reopen(&options);
+ ASSERT_EQ("bar", Get("foo"));
+ }
+}
+
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc
index bdd4a579b7..7d0a5de2b9 100644
--- a/src/leveldb/db/version_set.cc
+++ b/src/leveldb/db/version_set.cc
@@ -786,12 +786,23 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (s.ok()) {
s = descriptor_file_->Sync();
}
+ if (!s.ok()) {
+ Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
+ if (ManifestContains(record)) {
+ Log(options_->info_log,
+ "MANIFEST contains log record despite error; advancing to new "
+ "version to prevent mismatch between in-memory and logged state");
+ s = Status::OK();
+ }
+ }
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
+ // No need to double-check MANIFEST in case of error since it
+ // will be discarded below.
}
mu->Lock();
@@ -1025,6 +1036,31 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
return scratch->buffer;
}
+// Return true iff the manifest contains the specified record.
+bool VersionSet::ManifestContains(const std::string& record) const {
+ std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
+ Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
+ SequentialFile* file = NULL;
+ Status s = env_->NewSequentialFile(fname, &file);
+ if (!s.ok()) {
+ Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
+ return false;
+ }
+ log::Reader reader(file, NULL, true/*checksum*/, 0);
+ Slice r;
+ std::string scratch;
+ bool result = false;
+ while (reader.ReadRecord(&r, &scratch)) {
+ if (r == Slice(record)) {
+ result = true;
+ break;
+ }
+ }
+ delete file;
+ Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
+ return result;
+}
+
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h
index 792899b7f8..9d084fdb7d 100644
--- a/src/leveldb/db/version_set.h
+++ b/src/leveldb/db/version_set.h
@@ -277,6 +277,8 @@ class VersionSet {
void AppendVersion(Version* v);
+ bool ManifestContains(const std::string& record) const;
+
Env* const env_;
const std::string dbname_;
const Options* const options_;