From 68ce5d84f6a58d37e3725607385e1c68cb1254a4 Mon Sep 17 00:00:00 2001 From: leipeng Date: Fri, 15 Sep 2023 10:44:43 -0700 Subject: [PATCH] Add new Iterator API Refresh(const snapshot*) (#10594) Summary: This PR resolves https://github.com/facebook/rocksdb/issues/10487 & https://github.com/facebook/rocksdb/issues/10536, user code needs to call Refresh() periodically. The main code change is to support range deletions. A range tombstone iterator uses a sequence number as upper bound to decide which range tombstones are effective. During Iterator refresh, this sequence number upper bound needs to be updated for all range tombstone iterators under DBIter and LevelIterator. LevelIterator may create new table iterators and range tombstone iterator during scanning, so it needs to be aware of iterator refresh. The code path that propagates this change is `db_iter_->set_sequence(read_seq) -> MergingIterator::SetRangeDelReadSeqno() -> TruncatedRangeDelIterator::SetRangeDelReadSeqno() and LevelIterator::SetRangeDelReadSeqno()`. This change also fixes an issue where range tombstone iterators created by LevelIterator may access ReadOptions::snapshot, even though we do not explicitly require users to keep a snapshot alive after creating an Iterator. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10594 Test Plan: * New unit tests. * Add Iterator::Refresh(snapshot) to stress test. Note that this change only adds tests for refreshing to the same snapshot since this is the main target use case. TODO in a following PR: * Stress test Iterator::Refresh() to different snapshots or no snapshot. Reviewed By: ajkr Differential Revision: D48456896 Pulled By: cbi42 fbshipit-source-id: 2e642c04e91235cc9542ef4cd37b3c20823bd779 --- db/arena_wrapped_db_iter.cc | 33 +++-- db/arena_wrapped_db_iter.h | 1 + db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl_secondary.cc | 2 +- db/db_iter.h | 1 + db/db_iterator_test.cc | 92 +++++++++++--- db/db_range_del_test.cc | 118 ++++++++++++++++++ db/range_del_aggregator.h | 4 + db/range_tombstone_fragmenter.h | 4 + db/table_cache.cc | 6 +- db/table_cache.h | 3 + db/version_set.cc | 57 +++++---- db_stress_tool/db_stress_test_base.cc | 8 ++ include/rocksdb/iterator.h | 14 ++- table/block_based/block_based_table_reader.cc | 10 ++ table/block_based/block_based_table_reader.h | 3 + table/internal_iterator.h | 11 ++ table/iterator_wrapper.h | 5 + table/merging_iterator.cc | 12 ++ table/table_reader.h | 6 + .../new_features/iterator-refresh-snapshot.md | 1 + 21 files changed, 335 insertions(+), 58 deletions(-) create mode 100644 unreleased_history/new_features/iterator-refresh-snapshot.md diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index b101fbbc75d..865b1ad2eb3 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -19,6 +19,14 @@ namespace ROCKSDB_NAMESPACE { +inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) { + if (s) { + return s->GetSequenceNumber(); + } else { + return db->GetLatestSequenceNumber(); + } +} + Status ArenaWrappedDBIter::GetProperty(std::string prop_name, std::string* prop) { if (prop_name == "rocksdb.iterator.super-version-number") { @@ -54,7 +62,9 @@ void ArenaWrappedDBIter::Init( } } -Status ArenaWrappedDBIter::Refresh() { +Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); } + +Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) { if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { return Status::NotSupported("Creating renew iterator is not allowed."); } @@ -63,6 +73,10 @@ Status ArenaWrappedDBIter::Refresh() { // correct behavior. Will be corrected automatically when we take a snapshot // here for the case of WritePreparedTxnDB. uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + // If we recreate a new internal iterator below (NewInternalIterator()), + // we will pass in read_options_. We need to make sure it + // has the right snapshot. + read_options_.snapshot = snapshot; TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); auto reinit_internal_iter = [&]() { @@ -72,18 +86,18 @@ Status ArenaWrappedDBIter::Refresh() { new (&arena_) Arena(); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); - SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); + SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); if (read_callback_) { - read_callback_->Refresh(latest_seq); + read_callback_->Refresh(read_seq); } Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, - sv->current, latest_seq, + sv->current, read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_, allow_refresh_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( - read_options_, cfd_, sv, &arena_, latest_seq, + read_options_, cfd_, sv, &arena_, read_seq, /* allow_unprepared_value */ true, /* db_iter */ this); SetIterUnderDBIter(internal_iter); }; @@ -92,13 +106,13 @@ Status ArenaWrappedDBIter::Refresh() { reinit_internal_iter(); break; } else { - SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); + SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot); // Refresh range-tombstones in MemTable if (!read_options_.ignore_range_deletions) { SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_); TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr); auto t = sv->mem->NewRangeTombstoneIterator( - read_options_, latest_seq, false /* immutable_memtable */); + read_options_, read_seq, false /* immutable_memtable */); if (!t || t->empty()) { // If memtable_range_tombstone_iter_ points to a non-empty tombstone // iterator, then it means sv->mem is not the memtable that @@ -128,9 +142,6 @@ Status ArenaWrappedDBIter::Refresh() { } db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); } - // Refresh latest sequence number - db_iter_->set_sequence(latest_seq); - db_iter_->set_valid(false); // Check again if the latest super version number is changed uint64_t latest_sv_number = cfd_->GetSuperVersionNumber(); if (latest_sv_number != cur_sv_number) { @@ -139,6 +150,8 @@ Status ArenaWrappedDBIter::Refresh() { cur_sv_number = latest_sv_number; continue; } + db_iter_->set_sequence(read_seq); + db_iter_->set_valid(false); break; } } diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index f15be306d22..678ea3e78d7 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -80,6 +80,7 @@ class ArenaWrappedDBIter : public Iterator { Status GetProperty(std::string prop_name, std::string* prop) override; Status Refresh() override; + Status Refresh(const Snapshot*) override; void Init(Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 52dba4e5638..f546826a3a1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3650,7 +3650,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current, snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_callback, this, cfd, expose_blob_index, - read_options.snapshot != nullptr ? false : allow_refresh); + allow_refresh); InternalIterator* internal_iter = NewInternalIterator( db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot, diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index e6dd8e08080..10680ba1eca 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -535,7 +535,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( super_version->current, snapshot, super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->version_number, read_callback, this, cfd, - expose_blob_index, read_options.snapshot ? false : allow_refresh); + expose_blob_index, allow_refresh); auto internal_iter = NewInternalIterator( db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), snapshot, /* allow_unprepared_value */ true, db_iter); diff --git a/db/db_iter.h b/db/db_iter.h index 163da32650c..e45da9dd1bd 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -209,6 +209,7 @@ class DBIter final : public Iterator { if (read_callback_) { read_callback_->Refresh(s); } + iter_.SetRangeDelReadSeqno(s); } void set_valid(bool v) { valid_ = v; } diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index c982fcff18d..4df3448f963 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -2421,32 +2421,92 @@ TEST_P(DBIteratorTest, Refresh) { } TEST_P(DBIteratorTest, RefreshWithSnapshot) { - ASSERT_OK(Put("x", "y")); + // L1 file, uses LevelIterator internally + ASSERT_OK(Put(Key(0), "val0")); + ASSERT_OK(Put(Key(5), "val5")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + // L0 file, uses table iterator internally + ASSERT_OK(Put(Key(1), "val1")); + ASSERT_OK(Put(Key(4), "val4")); + ASSERT_OK(Flush()); + + // Memtable + ASSERT_OK(Put(Key(2), "val2")); + ASSERT_OK(Put(Key(3), "val3")); const Snapshot* snapshot = db_->GetSnapshot(); + ASSERT_OK(Put(Key(2), "new val")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(4), + Key(7))); + const Snapshot* snapshot2 = db_->GetSnapshot(); + + ASSERT_EQ(1, NumTableFilesAtLevel(1)); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ReadOptions options; options.snapshot = snapshot; Iterator* iter = NewIterator(options); + ASSERT_OK(Put(Key(6), "val6")); ASSERT_OK(iter->status()); - iter->Seek(Slice("a")); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(Slice("x")), 0); - iter->Next(); - ASSERT_FALSE(iter->Valid()); + auto verify_iter = [&](int start, int end, bool new_key2 = false) { + for (int i = start; i < end; ++i) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + if (i == 2 && new_key2) { + ASSERT_EQ(iter->value(), "new val"); + } else { + ASSERT_EQ(iter->value(), "val" + std::to_string(i)); + } + iter->Next(); + } + }; - ASSERT_OK(Put("c", "d")); + for (int j = 0; j < 2; j++) { + iter->Seek(Key(1)); + verify_iter(1, 3); + // Refresh to same snapshot + ASSERT_OK(iter->Refresh(snapshot)); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(3)); + verify_iter(3, 6); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to a newer snapshot + ASSERT_OK(iter->Refresh(snapshot2)); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->SeekToFirst(); + verify_iter(0, 4, /*new_key2=*/true); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to an older snapshot + ASSERT_OK(iter->Refresh(snapshot)); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(3)); + verify_iter(3, 6); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Refresh to no snapshot + ASSERT_OK(iter->Refresh()); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + iter->Seek(Key(2)); + verify_iter(2, 4, /*new_key2=*/true); + verify_iter(6, 7); + ASSERT_TRUE(iter->status().ok() && !iter->Valid()); + + // Change LSM shape, new SuperVersion is created. + ASSERT_OK(Flush()); - iter->Seek(Slice("a")); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(Slice("x")), 0); - iter->Next(); - ASSERT_FALSE(iter->Valid()); + // Refresh back to original snapshot + ASSERT_OK(iter->Refresh(snapshot)); + } - ASSERT_OK(iter->status()); - Status s = iter->Refresh(); - ASSERT_TRUE(s.IsNotSupported()); - db_->ReleaseSnapshot(snapshot); delete iter; + db_->ReleaseSnapshot(snapshot); + db_->ReleaseSnapshot(snapshot2); + ASSERT_OK(db_->Close()); } TEST_P(DBIteratorTest, CreationFailure) { diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index a19912aa690..2e93f96d719 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -3643,6 +3643,124 @@ TEST_F(DBRangeDelTest, RangeDelReseekAfterFileReadError) { iter.reset(); } + +TEST_F(DBRangeDelTest, ReleaseSnapshotAfterIteratorCreation) { + // Test that range tombstone code path in LevelIterator + // does access ReadOptions::snapshot after Iterator creation. + // + // Put some data in L2 so that range tombstone in L1 will not be dropped. + ASSERT_OK(Put(Key(0), "v")); + ASSERT_OK(Put(Key(100), "v")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + + // two L1 file with range del + ASSERT_OK(Put(Key(1), "v")); + ASSERT_OK(Put(Key(2), "v")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3), + Key(4))); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_OK(Put(Key(5), "v")); + ASSERT_OK(Put(Key(6), "v")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(5), + Key(6))); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + ASSERT_EQ(2, NumTableFilesAtLevel(1)); + ASSERT_EQ(1, NumTableFilesAtLevel(2)); + + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions ro; + ro.snapshot = snapshot; + + Iterator* iter = db_->NewIterator(ro); + db_->ReleaseSnapshot(snapshot); + + iter->Seek(Key(1)); + std::vector expected_keys{1, 2, 6, 100}; + for (int i : expected_keys) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(i)); + iter->Next(); + } + ASSERT_TRUE(!iter->Valid() && iter->status().ok()); + + delete iter; +} + +TEST_F(DBRangeDelTest, RefreshWithSnapshot) { + ASSERT_OK(Put(Key(4), "4")); + ASSERT_OK(Put(Key(6), "6")); + const Snapshot* snapshot = db_->GetSnapshot(); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), Key(3), + Key(5))); + + std::unique_ptr iter{db_->NewIterator(ReadOptions())}; + // Live Memtable + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(6)); + ASSERT_OK(iter->Refresh(snapshot)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(4)); + // Immutable Memtable + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + ASSERT_OK(iter->Refresh(nullptr)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(6)); + ASSERT_OK(iter->Refresh(snapshot)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(4)); + // L0 + ASSERT_OK(Flush()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + ASSERT_OK(iter->Refresh(nullptr)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(6)); + ASSERT_OK(iter->Refresh(snapshot)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(4)); + // L1 + MoveFilesToLevel(1); + ASSERT_EQ(1, NumTableFilesAtLevel(1)); + ASSERT_OK(iter->Refresh(nullptr)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(6)); + ASSERT_OK(iter->Refresh(snapshot)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(4)); + // L1 with two file. + // Test that when LevelIterator enters a new file, + // it remembers which snapshot sequence number to use. + ASSERT_OK(Put(Key(2), "2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_EQ(2, NumTableFilesAtLevel(1)); + ASSERT_OK(iter->Refresh(nullptr)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + // LevelIterator is at the first file + ASSERT_EQ(iter->key(), Key(2)); + ASSERT_OK(iter->Refresh(snapshot)); + // Will enter the second file, and create a new range tombstone iterator. + // It should use the snapshot sequence number. + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key(4)); + iter.reset(); + db_->ReleaseSnapshot(snapshot); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index dc1e730389b..f7fa87af40d 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -36,6 +36,10 @@ class TruncatedRangeDelIterator { const InternalKeyComparator* icmp, const InternalKey* smallest, const InternalKey* largest); + void SetRangeDelReadSeqno(SequenceNumber read_seqno) { + iter_->SetRangeDelReadSeqno(read_seqno); + } + bool Valid() const; void Next() { iter_->TopNext(); } diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index 8c7d9829728..ce631d495e6 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -148,6 +148,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { const InternalKeyComparator& icmp, SequenceNumber upper_bound, const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); + void SetRangeDelReadSeqno(SequenceNumber read_seqno) override { + upper_bound_ = read_seqno; + } + void SeekToFirst() override; void SeekToLast() override; diff --git a/db/table_cache.cc b/db/table_cache.cc index 2a4f3350530..8b3bc50df33 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -224,7 +224,7 @@ InternalIterator* TableCache::NewIterator( size_t max_file_size_for_l0_meta_pin, const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value, - uint8_t block_protection_bytes_per_key, + uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno, TruncatedRangeDelIterator** range_del_iter) { PERF_TIMER_GUARD(new_table_iterator_nanos); @@ -273,7 +273,9 @@ InternalIterator* TableCache::NewIterator( if (s.ok() && !options.ignore_range_deletions) { if (range_del_iter != nullptr) { auto new_range_del_iter = - table_reader->NewRangeTombstoneIterator(options); + read_seqno ? table_reader->NewRangeTombstoneIterator( + *read_seqno, options.timestamp) + : table_reader->NewRangeTombstoneIterator(options); if (new_range_del_iter == nullptr || new_range_del_iter->empty()) { delete new_range_del_iter; *range_del_iter = nullptr; diff --git a/db/table_cache.h b/db/table_cache.h index 39e41cc6c91..67d36d8051a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -86,6 +86,8 @@ class TableCache { // not cached), depending on the CF options // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" + // @param range_del_read_seqno If non-nullptr, will be used to create + // *range_del_iter. InternalIterator* NewIterator( const ReadOptions& options, const FileOptions& toptions, const InternalKeyComparator& internal_comparator, @@ -97,6 +99,7 @@ class TableCache { const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value, uint8_t protection_bytes_per_key, + const SequenceNumber* range_del_read_seqno = nullptr, TruncatedRangeDelIterator** range_del_iter = nullptr); // If a seek to internal key "k" in specified file finds an entry, diff --git a/db/version_set.cc b/db/version_set.cc index 572da83e382..ef6d3094407 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -957,18 +957,21 @@ class LevelIterator final : public InternalIterator { flevel_(flevel), prefix_extractor_(prefix_extractor), file_read_hist_(file_read_hist), - should_sample_(should_sample), caller_(caller), - skip_filters_(skip_filters), - allow_unprepared_value_(allow_unprepared_value), file_index_(flevel_->num_files), - level_(level), range_del_agg_(range_del_agg), pinned_iters_mgr_(nullptr), compaction_boundaries_(compaction_boundaries), - is_next_read_sequential_(false), - block_protection_bytes_per_key_(block_protection_bytes_per_key), range_tombstone_iter_(nullptr), + read_seq_(read_options.snapshot + ? read_options.snapshot->GetSequenceNumber() + : kMaxSequenceNumber), + level_(level), + block_protection_bytes_per_key_(block_protection_bytes_per_key), + should_sample_(should_sample), + skip_filters_(skip_filters), + allow_unprepared_value_(allow_unprepared_value), + is_next_read_sequential_(false), to_return_sentinel_(false) { // Empty level is not supported. assert(flevel_ != nullptr && flevel_->num_files > 0); @@ -1056,6 +1059,10 @@ class LevelIterator final : public InternalIterator { bool IsDeleteRangeSentinelKey() const override { return to_return_sentinel_; } + void SetRangeDelReadSeqno(SequenceNumber read_seq) override { + read_seq_ = read_seq; + } + private: // Return true if at least one invalid file is seen and skipped. bool SkipEmptyFileForward(); @@ -1112,7 +1119,7 @@ class LevelIterator final : public InternalIterator { /*arena=*/nullptr, skip_filters_, level_, /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key, largest_compaction_key, allow_unprepared_value_, - block_protection_bytes_per_key_, range_tombstone_iter_); + block_protection_bytes_per_key_, &read_seq_, range_tombstone_iter_); } // Check if current file being fully within iterate_lower_bound. @@ -1142,13 +1149,8 @@ class LevelIterator final : public InternalIterator { const std::shared_ptr& prefix_extractor_; HistogramImpl* file_read_hist_; - bool should_sample_; TableReaderCaller caller_; - bool skip_filters_; - bool allow_unprepared_value_; - bool may_be_out_of_lower_bound_ = true; size_t file_index_; - int level_; RangeDelAggregator* range_del_agg_; IteratorWrapper file_iter_; // May be nullptr PinnedIteratorsManager* pinned_iters_mgr_; @@ -1157,10 +1159,6 @@ class LevelIterator final : public InternalIterator { // tombstones. const std::vector* compaction_boundaries_; - bool is_next_read_sequential_; - - uint8_t block_protection_bytes_per_key_; - // This is set when this level iterator is used under a merging iterator // that processes range tombstones. range_tombstone_iter_ points to where the // merging iterator stores the range tombstones iterator for this level. When @@ -1177,20 +1175,29 @@ class LevelIterator final : public InternalIterator { // *range_tombstone_iter_ points to range tombstones of the current SST file TruncatedRangeDelIterator** range_tombstone_iter_; - // Whether next/prev key is a sentinel key. - bool to_return_sentinel_ = false; // The sentinel key to be returned Slice sentinel_; - // Sets flags for if we should return the sentinel key next. - // The condition for returning sentinel is reaching the end of current - // file_iter_: !Valid() && status.().ok(). - void TrySetDeleteRangeSentinel(const Slice& boundary_key); - void ClearSentinel() { to_return_sentinel_ = false; } + SequenceNumber read_seq_; + int level_; + uint8_t block_protection_bytes_per_key_; + bool should_sample_; + bool skip_filters_; + bool allow_unprepared_value_; + bool may_be_out_of_lower_bound_ = true; + bool is_next_read_sequential_; // Set in Seek() when a prefix seek reaches end of the current file, // and the next file has a different prefix. SkipEmptyFileForward() // will not move to next file when this flag is set. bool prefix_exhausted_ = false; + // Whether next/prev key is a sentinel key. + bool to_return_sentinel_ = false; + + // Sets flags for if we should return the sentinel key next. + // The condition for returning sentinel is reaching the end of current + // file_iter_: !Valid() && status.().ok(). + void TrySetDeleteRangeSentinel(const Slice& boundary_key); + void ClearSentinel() { to_return_sentinel_ = false; } }; void LevelIterator::TrySetDeleteRangeSentinel(const Slice& boundary_key) { @@ -2006,7 +2013,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, /*smallest_compaction_key=*/nullptr, /*largest_compaction_key=*/nullptr, allow_unprepared_value, - mutable_cf_options_.block_protection_bytes_per_key, &tombstone_iter); + mutable_cf_options_.block_protection_bytes_per_key, + /*range_del_read_seqno=*/nullptr, &tombstone_iter); if (read_options.ignore_range_deletions) { merge_iter_builder->AddIterator(table_iter); } else { @@ -6956,6 +6964,7 @@ InternalIterator* VersionSet::MakeInputIterator( /*largest_compaction_key=*/nullptr, /*allow_unprepared_value=*/false, c->mutable_cf_options()->block_protection_bytes_per_key, + /*range_del_read_seqno=*/nullptr, /*range_del_iter=*/&range_tombstone_iter); range_tombstones.emplace_back(range_tombstone_iter, nullptr); } diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index d546984358c..98de35b8d72 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1345,6 +1345,14 @@ Status StressTest::TestIterate(ThreadState* thread, const bool support_seek_first_or_last = expect_total_order; + // Write-prepared and Write-unprepared do not support Refresh() yet. + if (!(FLAGS_use_txn && FLAGS_txn_write_policy != 0 /* write committed */) && + thread->rand.OneIn(4)) { + Status s = iter->Refresh(snapshot_guard.snapshot()); + assert(s.ok()); + op_logs += "Refresh "; + } + LastIterateOp last_op; if (support_seek_first_or_last && thread->rand.OneIn(100)) { iter->SeekToFirst(); diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9d4c9f73a10..c50c825f31b 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -107,10 +107,16 @@ class Iterator : public Cleanable { // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; - // If supported, renew the iterator to represent the latest state. The - // iterator will be invalidated after the call. Not supported if - // ReadOptions.snapshot is given when creating the iterator. - virtual Status Refresh() { + // If supported, the DB state that the iterator reads from is updated to + // the latest state. The iterator will be invalidated after the call. + // Regardless of whether the iterator was created/refreshed previously + // with or without a snapshot, the iterator will be reading the + // latest DB state after this call. + virtual Status Refresh() { return Refresh(nullptr); } + + // Similar to Refresh() but the iterator will be reading the latest DB state + // under the given snapshot. + virtual Status Refresh(const class Snapshot*) { return Status::NotSupported("Refresh() is not supported"); } diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 57d65d5552e..a454762e259 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1913,6 +1913,16 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( snapshot, read_options.timestamp); } +FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( + SequenceNumber read_seqno, const Slice* timestamp) { + if (rep_->fragmented_range_dels == nullptr) { + return nullptr; + } + return new FragmentedRangeTombstoneIterator(rep_->fragmented_range_dels, + rep_->internal_comparator, + read_seqno, timestamp); +} + bool BlockBasedTable::FullFilterKeyMayMatch( FilterBlockReader* filter, const Slice& internal_key, const bool no_io, const SliceTransform* prefix_extractor, GetContext* get_context, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 4ea4212ae84..120907240fc 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -138,6 +138,9 @@ class BlockBasedTable : public TableReader { FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& read_options) override; + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + SequenceNumber read_seqno, const Slice* timestamp) override; + // @param skip_filters Disables loading/accessing the filter block Status Get(const ReadOptions& readOptions, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 8015ed63511..060306003ce 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -43,6 +43,17 @@ class InternalIteratorBase : public Cleanable { virtual ~InternalIteratorBase() {} + // This iterator will only process range tombstones with sequence + // number <= `read_seqno`. + // Noop for most child classes. + // For range tombstone iterators (TruncatedRangeDelIterator, + // FragmentedRangeTombstoneIterator), will only return range tombstones with + // sequence number <= `read_seqno`. For LevelIterator, it may open new table + // files and create new range tombstone iterators during scanning. It will use + // `read_seqno` as the sequence number for creating new range tombstone + // iterators. + virtual void SetRangeDelReadSeqno(SequenceNumber /* read_seqno */) {} + // An iterator is either positioned at a key/value pair, or // not valid. This method returns true iff the iterator is valid. // Always returns false if !status().ok(). diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 17abef4ac79..3e6f9c1ae5f 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -30,6 +30,11 @@ class IteratorWrapperBase { } ~IteratorWrapperBase() {} InternalIteratorBase* iter() const { return iter_; } + void SetRangeDelReadSeqno(SequenceNumber read_seqno) { + if (iter_) { + iter_->SetRangeDelReadSeqno(read_seqno); + } + } // Set the underlying Iterator to _iter and return // previous underlying Iterator. diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 505cd76d386..247564fe7b0 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -135,6 +135,18 @@ class MergingIterator : public InternalIterator { status_.PermitUncheckedError(); } + void SetRangeDelReadSeqno(SequenceNumber read_seqno) override { + for (auto& child : children_) { + // This should only be needed for LevelIterator (iterators from L1+). + child.iter.SetRangeDelReadSeqno(read_seqno); + } + for (auto& child : range_tombstone_iters_) { + if (child) { + child->SetRangeDelReadSeqno(read_seqno); + } + } + } + bool Valid() const override { return current_ != nullptr && status_.ok(); } Status status() const override { return status_; } diff --git a/table/table_reader.h b/table/table_reader.h index 53c52205293..87610f4fed1 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -60,11 +60,17 @@ class TableReader { size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) = 0; + // read_options.snapshot needs to outlive this call. virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& /*read_options*/) { return nullptr; } + virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + SequenceNumber /* read_seqno */, const Slice* /* timestamp */) { + return nullptr; + } + // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were // present in the file). The returned value is in terms of file diff --git a/unreleased_history/new_features/iterator-refresh-snapshot.md b/unreleased_history/new_features/iterator-refresh-snapshot.md new file mode 100644 index 00000000000..f8a0e7b431f --- /dev/null +++ b/unreleased_history/new_features/iterator-refresh-snapshot.md @@ -0,0 +1 @@ +Add a new iterator API `Iterator::Refresh(const Snapshot *)` that allows iterator to be refreshed while using the input snapshot to read. \ No newline at end of file