aboutsummaryrefslogtreecommitdiff
path: root/src/leveldb/db/version_set.cc
diff options
context:
space:
mode:
authorDavid Grogan <dgrogan@chromium.org>2013-01-07 13:17:43 -0800
committerGavin Andresen <gavinandresen@gmail.com>2013-01-23 14:00:29 -0500
commit36311b9a194e3546006478157279383ffa624cf0 (patch)
tree10874de5669607f5c2d3f443bdc23af5aafb3f25 /src/leveldb/db/version_set.cc
parentf79ddf24a967a0043270b2999921550c6c366ec4 (diff)
Fix corruption bug found and analyzed by dhruba@gmail.com
https://groups.google.com/d/msg/leveldb/Kc9JxuIUu5A/9P0N9RL4ar8J
Diffstat (limited to 'src/leveldb/db/version_set.cc')
-rw-r--r--src/leveldb/db/version_set.cc36
1 files changed, 36 insertions, 0 deletions
diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc
index bdd4a579b7..7d0a5de2b9 100644
--- a/src/leveldb/db/version_set.cc
+++ b/src/leveldb/db/version_set.cc
@@ -786,12 +786,23 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (s.ok()) {
s = descriptor_file_->Sync();
}
+ if (!s.ok()) {
+ Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
+ if (ManifestContains(record)) {
+ Log(options_->info_log,
+ "MANIFEST contains log record despite error; advancing to new "
+ "version to prevent mismatch between in-memory and logged state");
+ s = Status::OK();
+ }
+ }
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
+ // No need to double-check MANIFEST in case of error since it
+ // will be discarded below.
}
mu->Lock();
@@ -1025,6 +1036,31 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
return scratch->buffer;
}
+// Return true iff the manifest contains the specified record.
+bool VersionSet::ManifestContains(const std::string& record) const {
+ std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
+ Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
+ SequentialFile* file = NULL;
+ Status s = env_->NewSequentialFile(fname, &file);
+ if (!s.ok()) {
+ Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
+ return false;
+ }
+ log::Reader reader(file, NULL, true/*checksum*/, 0);
+ Slice r;
+ std::string scratch;
+ bool result = false;
+ while (reader.ReadRecord(&r, &scratch)) {
+ if (r == Slice(record)) {
+ result = true;
+ break;
+ }
+ }
+ delete file;
+ Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
+ return result;
+}
+
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {