Skip to content

Commit

Permalink
Add new Iterator API Refresh(const snapshot*) (facebook#10594)
Browse files Browse the repository at this point in the history
Summary:
This PR resolves facebook#10487 & facebook#10536, user code needs to call Refresh() periodically.

The main code change is to support range deletions. A range tombstone iterator uses a sequence number as upper bound to decide which range tombstones are effective. During Iterator refresh, this sequence number upper bound needs to be updated for all range tombstone iterators under DBIter and LevelIterator. LevelIterator may create new table iterators and range tombstone iterator during scanning, so it needs to be aware of iterator refresh. The code path that propagates this change is `db_iter_->set_sequence(read_seq)  -> MergingIterator::SetRangeDelReadSeqno() -> TruncatedRangeDelIterator::SetRangeDelReadSeqno() and LevelIterator::SetRangeDelReadSeqno()`.

This change also fixes an issue where range tombstone iterators created by LevelIterator may access ReadOptions::snapshot, even though we do not explicitly require users to keep a snapshot alive after creating an Iterator.

Pull Request resolved: facebook#10594

Test Plan:
* New unit tests.
* Add Iterator::Refresh(snapshot) to stress test. Note that this change only adds tests for refreshing to the same snapshot since this is the main target use case.

TODO in a following PR:
* Stress test Iterator::Refresh() to different snapshots or no snapshot.

Reviewed By: ajkr

Differential Revision: D48456896

Pulled By: cbi42

fbshipit-source-id: 2e642c04e91235cc9542ef4cd37b3c20823bd779
  • Loading branch information
rockeet authored and facebook-github-bot committed Sep 15, 2023
1 parent b050751 commit 68ce5d8
Show file tree
Hide file tree
Showing 21 changed files with 335 additions and 58 deletions.
33 changes: 23 additions & 10 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

namespace ROCKSDB_NAMESPACE {

inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) {
if (s) {
return s->GetSequenceNumber();
} else {
return db->GetLatestSequenceNumber();
}
}

Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
Expand Down Expand Up @@ -54,7 +62,9 @@ void ArenaWrappedDBIter::Init(
}
}

Status ArenaWrappedDBIter::Refresh() {
Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }

Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
Expand All @@ -63,6 +73,10 @@ Status ArenaWrappedDBIter::Refresh() {
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
read_options_.snapshot = snapshot;
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
auto reinit_internal_iter = [&]() {
Expand All @@ -72,18 +86,18 @@ Status ArenaWrappedDBIter::Refresh() {
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
if (read_callback_) {
read_callback_->Refresh(latest_seq);
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
sv->current, latest_seq,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, latest_seq,
read_options_, cfd_, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
Expand All @@ -92,13 +106,13 @@ Status ArenaWrappedDBIter::Refresh() {
reinit_internal_iter();
break;
} else {
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, latest_seq, false /* immutable_memtable */);
read_options_, read_seq, false /* immutable_memtable */);
if (!t || t->empty()) {
// If memtable_range_tombstone_iter_ points to a non-empty tombstone
// iterator, then it means sv->mem is not the memtable that
Expand Down Expand Up @@ -128,9 +142,6 @@ Status ArenaWrappedDBIter::Refresh() {
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
}
// Refresh latest sequence number
db_iter_->set_sequence(latest_seq);
db_iter_->set_valid(false);
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
Expand All @@ -139,6 +150,8 @@ Status ArenaWrappedDBIter::Refresh() {
cur_sv_number = latest_sv_number;
continue;
}
db_iter_->set_sequence(read_seq);
db_iter_->set_valid(false);
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ArenaWrappedDBIter : public Iterator {
Status GetProperty(std::string prop_name, std::string* prop) override;

Status Refresh() override;
Status Refresh(const Snapshot*) override;

void Init(Env* env, const ReadOptions& read_options,
const ImmutableOptions& ioptions,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3650,7 +3650,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
read_options.snapshot != nullptr ? false : allow_refresh);
allow_refresh);

InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback, this, cfd,
expose_blob_index, read_options.snapshot ? false : allow_refresh);
expose_blob_index, allow_refresh);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
snapshot, /* allow_unprepared_value */ true, db_iter);
Expand Down
1 change: 1 addition & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class DBIter final : public Iterator {
if (read_callback_) {
read_callback_->Refresh(s);
}
iter_.SetRangeDelReadSeqno(s);
}
void set_valid(bool v) { valid_ = v; }

Expand Down
92 changes: 76 additions & 16 deletions db/db_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2421,32 +2421,92 @@ TEST_P(DBIteratorTest, Refresh) {
}

TEST_P(DBIteratorTest, RefreshWithSnapshot) {
ASSERT_OK(Put("x", "y"));
// L1 file, uses LevelIterator internally
ASSERT_OK(Put(Key(0), "val0"));
ASSERT_OK(Put(Key(5), "val5"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

// L0 file, uses table iterator internally
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Put(Key(4), "val4"));
ASSERT_OK(Flush());

// Memtable
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(3), "val3"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(Put(Key(2), "new val"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4),
Key(7)));
const Snapshot* snapshot2 = db_->GetSnapshot();

ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(0));

ReadOptions options;
options.snapshot = snapshot;
Iterator* iter = NewIterator(options);
ASSERT_OK(Put(Key(6), "val6"));
ASSERT_OK(iter->status());

iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
auto verify_iter = [&](int start, int end, bool new_key2 = false) {
for (int i = start; i < end; ++i) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
if (i == 2 && new_key2) {
ASSERT_EQ(iter->value(), "new val");
} else {
ASSERT_EQ(iter->value(), "val" + std::to_string(i));
}
iter->Next();
}
};

ASSERT_OK(Put("c", "d"));
for (int j = 0; j < 2; j++) {
iter->Seek(Key(1));
verify_iter(1, 3);
// Refresh to same snapshot
ASSERT_OK(iter->Refresh(snapshot));
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to a newer snapshot
ASSERT_OK(iter->Refresh(snapshot2));
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->SeekToFirst();
verify_iter(0, 4, /*new_key2=*/true);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to an older snapshot
ASSERT_OK(iter->Refresh(snapshot));
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(3));
verify_iter(3, 6);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Refresh to no snapshot
ASSERT_OK(iter->Refresh());
ASSERT_TRUE(iter->status().ok() && !iter->Valid());
iter->Seek(Key(2));
verify_iter(2, 4, /*new_key2=*/true);
verify_iter(6, 7);
ASSERT_TRUE(iter->status().ok() && !iter->Valid());

// Change LSM shape, new SuperVersion is created.
ASSERT_OK(Flush());

iter->Seek(Slice("a"));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("x")), 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
// Refresh back to original snapshot
ASSERT_OK(iter->Refresh(snapshot));
}

ASSERT_OK(iter->status());
Status s = iter->Refresh();
ASSERT_TRUE(s.IsNotSupported());
db_->ReleaseSnapshot(snapshot);
delete iter;
db_->ReleaseSnapshot(snapshot);
db_->ReleaseSnapshot(snapshot2);
ASSERT_OK(db_->Close());
}

TEST_P(DBIteratorTest, CreationFailure) {
Expand Down
118 changes: 118 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3643,6 +3643,124 @@ TEST_F(DBRangeDelTest, RangeDelReseekAfterFileReadError) {

iter.reset();
}

TEST_F(DBRangeDelTest, ReleaseSnapshotAfterIteratorCreation) {
// Test that range tombstone code path in LevelIterator
// does access ReadOptions::snapshot after Iterator creation.
//
// Put some data in L2 so that range tombstone in L1 will not be dropped.
ASSERT_OK(Put(Key(0), "v"));
ASSERT_OK(Put(Key(100), "v"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);

// two L1 file with range del
ASSERT_OK(Put(Key(1), "v"));
ASSERT_OK(Put(Key(2), "v"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(4)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

ASSERT_OK(Put(Key(5), "v"));
ASSERT_OK(Put(Key(6), "v"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5),
Key(6)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

ASSERT_EQ(2, NumTableFilesAtLevel(1));
ASSERT_EQ(1, NumTableFilesAtLevel(2));

const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions ro;
ro.snapshot = snapshot;

Iterator* iter = db_->NewIterator(ro);
db_->ReleaseSnapshot(snapshot);

iter->Seek(Key(1));
std::vector<int> expected_keys{1, 2, 6, 100};
for (int i : expected_keys) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(i));
iter->Next();
}
ASSERT_TRUE(!iter->Valid() && iter->status().ok());

delete iter;
}

TEST_F(DBRangeDelTest, RefreshWithSnapshot) {
ASSERT_OK(Put(Key(4), "4"));
ASSERT_OK(Put(Key(6), "6"));
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3),
Key(5)));

std::unique_ptr<Iterator> iter{db_->NewIterator(ReadOptions())};
// Live Memtable
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(6));
ASSERT_OK(iter->Refresh(snapshot));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(4));
// Immutable Memtable
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(iter->Refresh(nullptr));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(6));
ASSERT_OK(iter->Refresh(snapshot));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(4));
// L0
ASSERT_OK(Flush());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
ASSERT_OK(iter->Refresh(nullptr));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(6));
ASSERT_OK(iter->Refresh(snapshot));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(4));
// L1
MoveFilesToLevel(1);
ASSERT_EQ(1, NumTableFilesAtLevel(1));
ASSERT_OK(iter->Refresh(nullptr));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(6));
ASSERT_OK(iter->Refresh(snapshot));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(4));
// L1 with two file.
// Test that when LevelIterator enters a new file,
// it remembers which snapshot sequence number to use.
ASSERT_OK(Put(Key(2), "2"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_EQ(2, NumTableFilesAtLevel(1));
ASSERT_OK(iter->Refresh(nullptr));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
// LevelIterator is at the first file
ASSERT_EQ(iter->key(), Key(2));
ASSERT_OK(iter->Refresh(snapshot));
// Will enter the second file, and create a new range tombstone iterator.
// It should use the snapshot sequence number.
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key(4));
iter.reset();
db_->ReleaseSnapshot(snapshot);
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
4 changes: 4 additions & 0 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class TruncatedRangeDelIterator {
const InternalKeyComparator* icmp, const InternalKey* smallest,
const InternalKey* largest);

void SetRangeDelReadSeqno(SequenceNumber read_seqno) {
iter_->SetRangeDelReadSeqno(read_seqno);
}

bool Valid() const;

void Next() { iter_->TopNext(); }
Expand Down
4 changes: 4 additions & 0 deletions db/range_tombstone_fragmenter.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);

void SetRangeDelReadSeqno(SequenceNumber read_seqno) override {
upper_bound_ = read_seqno;
}

void SeekToFirst() override;
void SeekToLast() override;

Expand Down
Loading

0 comments on commit 68ce5d8

Please sign in to comment.