Skip to content

Commit

Permalink
[sys-7568] divergence detection for the case of no manifest write gen… (
Browse files Browse the repository at this point in the history
#323)

[sys-7568] divergence detection for the case of no manifest write generated for snapshot epoch
  • Loading branch information
seckcoder authored Apr 12, 2024
1 parent 5699d8c commit 5fe17c7
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 40 deletions.
18 changes: 12 additions & 6 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class ReplicationTest : public testing::Test {
}

DB* openLeader() { return openLeader(leaderOptions()); }
DB* openLeader(Options options);
DB* openLeader(Options options, uint64_t snapshot_replication_epoch = 1);
void closeLeader() {
leader_cfs_.clear();
leader_db_.reset();
Expand Down Expand Up @@ -364,6 +364,7 @@ class ReplicationTest : public testing::Test {
std::unique_ptr<DB> follower_db_;
ColumnFamilyMap follower_cfs_;
std::shared_ptr<Listener> listener_;
uint64_t snapshot_replication_epoch_{0};
};

Options ReplicationTest::leaderOptions() const {
Expand All @@ -382,7 +383,7 @@ Options ReplicationTest::leaderOptions() const {
return options;
}

DB* ReplicationTest::openLeader(Options options) {
DB* ReplicationTest::openLeader(Options options, uint64_t snapshot_replication_epoch) {
bool firstOpen = log_records_.empty();
auto dbname = test_dir_ + "/leader";

Expand Down Expand Up @@ -417,6 +418,7 @@ DB* ReplicationTest::openLeader(Options options) {
assert(inserted);
}

snapshot_replication_epoch_ = snapshot_replication_epoch;
if (!firstOpen) {
MutexLock lock(&log_records_mutex_);
listener_->setState(Listener::RECOVERY);
Expand All @@ -427,7 +429,9 @@ DB* ReplicationTest::openLeader(Options options) {
s = db->ApplyReplicationLogRecord(
log_records_[leaderSeq].first, log_records_[leaderSeq].second,
[this](Slice) { return ColumnFamilyOptions(leaderOptions()); },
true /* allow_new_manifest_writes */, &info,
true /* allow_new_manifest_writes */,
snapshot_replication_epoch_,
&info,
DB::AR_EVICT_OBSOLETE_FILES |
DB::AR_EPOCH_BASED_DIVERGENCE_DETECTION);
assert(s.ok());
Expand Down Expand Up @@ -492,7 +496,9 @@ size_t ReplicationTest::catchUpFollower(std::optional<size_t> num_records,
[this](Slice) {
return ColumnFamilyOptions(follower_db_->GetOptions());
},
allow_new_manifest_writes, &info, flags);
allow_new_manifest_writes,
snapshot_replication_epoch_,
&info, flags);
assert(s.ok());
++ret;
}
Expand Down Expand Up @@ -1470,7 +1476,7 @@ TEST_F(ReplicationTest, MaxNumReplicationEpochs) {
options.disable_auto_flush = true;
// maintain at most two replication epochs in the set
options.max_num_replication_epochs = 2;
auto leader = openLeader(options);
auto leader = openLeader(options, 1 /* snapshot_replication_epoch */);
openFollower(options);

auto cf = [](int i) { return "cf" + std::to_string(i); };
Expand Down Expand Up @@ -1513,7 +1519,7 @@ TEST_F(ReplicationTest, MaxNumReplicationEpochs) {
verifyReplicationEpochsEqual();

closeLeader();
leader = openLeader(options);
leader = openLeader(options, 4 /* snapshot_replication_epoch */);
ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
2);
ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().GetSmallestEpoch(),
Expand Down
97 changes: 64 additions & 33 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1282,12 +1282,14 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
std::string replication_sequence,
CFOptionsFactory cf_options_factory,
bool allow_new_manifest_writes,
uint64_t snapshot_replication_epoch,
ApplyReplicationLogRecordInfo* info,
unsigned flags) {
JobContext job_context(0, false);
Status s;
bool evictObsoleteFiles = flags & AR_EVICT_OBSOLETE_FILES;
bool enableEpochBasedDivergenceDetection = flags & AR_EPOCH_BASED_DIVERGENCE_DETECTION;
bool enableEpochBasedDivergenceDetection =
flags & AR_EPOCH_BASED_DIVERGENCE_DETECTION;

{
WriteThread::Writer w;
Expand Down Expand Up @@ -1399,21 +1401,34 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
break;
}
latest_applied_update_sequence = e.GetManifestUpdateSequence();
if (e.GetManifestUpdateSequence() <= current_update_sequence) {
// It's possible that applied MUS to be smaller than the current MUS
// in VersionSet when recovering based on local log. We rely on the
// `ReplicationEpochSet` maintained in `VersionSet` to help detect
// diverged local log. NOTE:
// 1. if epoch based divergence detection is
// enabled, `ReplicationEpochSet` can only be empty when this is a
// new db opening with epoch 0, i.e., no new epoch is generated yet,
// and follower starts tailing from leader when epoch is 0.
// Currently, this is only possible in tests. In practice, follower
// will always have non empty replication epoch set when applying
// version edits.
// 2. we can only detect diverged manifest write with mus <= db's
// mus. For mus > db's mus, divergence is only detected when the
// follower connects to leader

// Epoch based divergence detection, used to detect if the local log
// is diverged from the MANIFEST file the db is opened with.
//
// High level idea: we maintain all the (epoch, first mus of the
// epoch) after the persisted replication sequence, i.e., the
// `ReplicationEpochSet`, in VersionSet/Manifest file. When recovering
// local log, we infer the epoch based on the `ReplicationEpochSet`
// and the VersionEdit's manifest update sequence, and compare that
// with the actual epoch of the replication record.
//
// A few special cases:
// 1. if epoch based divergence detection is
// enabled, `ReplicationEpochSet` can only be empty when this is a
// new db opening with epoch 0, i.e., no new epoch is generated yet,
// and follower starts tailing from leader when epoch is 0.
// Currently, this is only possible in tests. In practice, follower
// will always have non empty replication epoch set when applying
// version edits.
// 2. If the mus of `e` is smaller than the smallest mus in the
// `ReplicationEpochSet`, we can't infer the exact epoch. So the only
// verification we do here is to make sure the epoch of the record is
// also smaller than the smallest epoch in the `ReplicationEpochSet`
// 3. We have to do divergence detection even if mus of `e` is greater
// than `current_update_sequence`. Reason is, it's possible a snapshot
// at epoch `e` is generated while there is no manifest writes for the
// latest epoch yet.
if (latest_applied_update_sequence <= current_update_sequence) {
if (enableEpochBasedDivergenceDetection &&
!versions_->IsReplicationEpochsEmpty()) {
auto inferred_epoch_of_mus = versions_->GetReplicationEpochForMUS(
Expand All @@ -1424,34 +1439,50 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
replication_epoch >=
versions_->replication_epochs_.GetSmallestEpoch()) {
info->diverged_manifest_writes = true;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Diverged manifest found: replication seq: %s, "
"mus: %" PRIu64 ", smallest epoch: %" PRIu64
", actual epoch: %" PRIu64,
replication_sequence.c_str(),
latest_applied_update_sequence,
versions_->replication_epochs_.GetSmallestEpoch(),
replication_epoch);
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Diverged manifest found: mus: %" PRIu64
", smallest epoch: %" PRIu64 ", actual epoch: %" PRIu64,
latest_applied_update_sequence,
versions_->replication_epochs_.GetSmallestEpoch(),
replication_epoch);
break;
}
// If we can infer epoch, make sure the epoch actually matches
// with epoch in the `replication_sequence`
if (inferred_epoch_of_mus &&
(*inferred_epoch_of_mus != replication_epoch)) {
info->diverged_manifest_writes = true;
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Diverged manifest found: replication seq: %s, mus: "
"%" PRIu64 ", inferred epoch: %" PRIu64
", actual epoch: %" PRIu64,
replication_sequence.c_str(),
latest_applied_update_sequence, *inferred_epoch_of_mus,
replication_epoch);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Diverged manifest found: mus: %" PRIu64
", inferred epoch: %" PRIu64
", actual epoch: %" PRIu64,
latest_applied_update_sequence,
*inferred_epoch_of_mus, replication_epoch);
break;
}
} // else Old manifest write which is not diverged
}

// don't apply the manifest write if it's already applied
continue;
} else {
if (enableEpochBasedDivergenceDetection &&
!versions_->IsReplicationEpochsEmpty() &&
versions_->replication_epochs_.GetLargestEpoch() < snapshot_replication_epoch) {
// This should be the first mus of `snapshotEpoch`
if (replication_epoch != snapshot_replication_epoch) {
info->diverged_manifest_writes = true;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Diverged manifest found: mus: %" PRIu64
", replication epoch: %" PRIu64
", snapshot epoch: %" PRIu64,
latest_applied_update_sequence,
replication_epoch, snapshot_replication_epoch);
break;
}
}
}

info->has_new_manifest_writes = true;
if (!allow_new_manifest_writes) {
// We don't expect new manifest writes, break early
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ class DBImpl : public DB {
std::string replication_sequence,
CFOptionsFactory cf_options_factory,
bool allow_new_manifest_writes,
uint64_t snapshot_replication_epoch,
ApplyReplicationLogRecordInfo* info,
unsigned flags) override;

Expand Down
1 change: 1 addition & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3437,6 +3437,7 @@ class ModelDB : public DB {
std::string /*replication_sequence*/,
CFOptionsFactory /* cf_options_factory */,
bool /* allow_new_manifest_writes */,
uint64_t /* snapshot_replication_epoch */,
ApplyReplicationLogRecordInfo* /*info*/,
unsigned /*flags*/) override {
return Status::NotSupported("Not supported in Model DB");
Expand Down
6 changes: 6 additions & 0 deletions db/replication_epoch_edit.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cassert>
#include <cstdint>
#include <deque>
#include <optional>
Expand Down Expand Up @@ -102,8 +103,13 @@ class ReplicationEpochSet {

const auto& GetEpochs() const { return epochs_; }
uint64_t GetSmallestEpoch() const {
assert(!epochs_.empty());
return epochs_.front().GetEpoch();
}
uint64_t GetLargestEpoch() const {
assert(!epochs_.empty());
return epochs_.back().GetEpoch();
}

bool empty() const { return epochs_.empty(); }
auto size() const { return epochs_.size(); }
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ class DB {
std::string replication_sequence,
CFOptionsFactory cf_options_factory,
bool allow_new_manifest_writes,
uint64_t snapshot_replication_epoch,
ApplyReplicationLogRecordInfo* info,
unsigned flags = 0) = 0;
virtual Status GetReplicationRecordDebugString(
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/utilities/stackable_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,12 @@ class StackableDB : public DB {
std::string replication_sequence,
CFOptionsFactory cf_options_factory,
bool allow_new_manifest_writes,
uint64_t snapshot_replication_epoch,
ApplyReplicationLogRecordInfo* info,
unsigned flags) override {
return db_->ApplyReplicationLogRecord(
record, replication_sequence, std::move(cf_options_factory),
allow_new_manifest_writes, info, flags);
allow_new_manifest_writes, snapshot_replication_epoch, info, flags);
}
Status GetReplicationRecordDebugString(const ReplicationLogRecord& record,
std::string* out) const override {
Expand Down

0 comments on commit 5fe17c7

Please sign in to comment.