From 1104eaa35e5af4ec0b31379c06cad47d5449b0fa Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Thu, 14 Mar 2024 15:44:55 -0700 Subject: [PATCH] Add initial support for TimedPut API (#12419) Summary: This PR adds support for `TimedPut` API. We introduced a new type `kTypeValuePreferredSeqno` for entries added to the DB via the `TimedPut` API. The life cycle of such an entry on the write/flush/compaction paths are: 1) It is initially added to memtable as: `: {value, write_unix_time}` 2) When it's flushed to L0 sst files, it's converted to: `: {value, preferred_seqno}` when we have easy access to the seqno to time mapping. 3) During compaction, if certain conditions are met, we swap in the `preferred_seqno` and the entry will become: `: value`. This step helps fast track these entries to the cold tier if they are eligible after the sequence number swap. On the read path: A `kTypeValuePreferredSeqno` entry acts the same as a `kTypeValue` entry, the unix_write_time/preferred seqno part packed in value is completely ignored. Needed follow ups: 1) The seqno to time mapping accessible in flush needs to be extended to cover the `write_unix_time` for possible `kTypeValuePreferredSeqno` entries. This also means we need to track these `write_unix_time` in memtable. 2) Compaction filter support for the new `kTypeValuePreferredSeqno` type for feature parity with other `kTypeValue` and equivalent types. 3) Stress test coverage for the feature Pull Request resolved: https://github.com/facebook/rocksdb/pull/12419 Test Plan: Added unit tests Reviewed By: pdillinger Differential Revision: D54920296 Pulled By: jowlyzhang fbshipit-source-id: c8b43f7a7c465e569141770e93c748371ff1da9e --- db/builder.cc | 33 +- db/compaction/compaction_iteration_stats.h | 7 + db/compaction/compaction_iterator.cc | 55 ++- db/compaction/compaction_iterator.h | 1 + db/compaction/compaction_iterator_test.cc | 340 ++++++++++++++---- db/compaction/tiered_compaction_test.cc | 56 +++ db/db_basic_test.cc | 35 ++ db/db_iter.cc | 57 ++- db/db_iter_test.cc | 62 ++++ db/db_test_util.cc | 13 + db/db_test_util.h | 3 + db/dbformat.cc | 2 +- db/dbformat.h | 10 +- db/flush_job.cc | 3 + db/flush_job_test.cc | 75 ++++ db/memtable.cc | 10 +- db/memtable_list_test.cc | 33 +- db/merge_helper.cc | 12 + db/seqno_time_test.cc | 21 ++ db/seqno_to_time_mapping.cc | 41 +++ db/seqno_to_time_mapping.h | 24 ++ db/write_batch.cc | 138 ++++++- db/write_batch_internal.h | 4 + db/write_batch_test.cc | 47 ++- include/rocksdb/write_batch.h | 23 +- include/rocksdb/write_batch_base.h | 8 +- table/block_based/block.cc | 10 +- table/get_context.cc | 39 +- table/sst_file_dumper.cc | 7 + util/write_batch_util.h | 5 + utilities/debug.cc | 6 +- .../write_batch_with_index.cc | 9 +- .../write_batch_with_index_internal.cc | 10 +- 33 files changed, 1057 insertions(+), 142 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index d754d495591..ce7b88d5dc1 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -211,12 +211,16 @@ Status BuildTable( ts_sz > 0 && !ioptions.persist_user_defined_timestamps; std::string key_after_flush_buf; + std::string value_buf; c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); const Slice& value = c_iter.value(); - const ParsedInternalKey& ikey = c_iter.ikey(); - Slice key_after_flush = key; + ParsedInternalKey ikey = c_iter.ikey(); + key_after_flush_buf.assign(key.data(), key.size()); + Slice key_after_flush = key_after_flush_buf; + Slice value_after_flush = value; + // If user defined timestamps will be stripped from user key after flush, // the in memory version of the key act logically the same as one with a // minimum timestamp. We update the timestamp here so file boundary and @@ -227,17 +231,34 @@ Status BuildTable( key_after_flush = key_after_flush_buf; } + if (ikey.type == kTypeValuePreferredSeqno) { + auto [unpacked_value, unix_write_time] = + ParsePackedValueWithWriteTime(value); + SequenceNumber preferred_seqno = + seqno_to_time_mapping.GetProximalSeqnoBeforeTime(unix_write_time); + if (preferred_seqno < ikey.sequence) { + value_after_flush = + PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf); + } else { + // Cannot get a useful preferred seqno, convert it to a kTypeValue. + UpdateInternalKey(&key_after_flush_buf, ikey.sequence, kTypeValue); + ikey = ParsedInternalKey(ikey.user_key, ikey.sequence, kTypeValue); + key_after_flush = key_after_flush_buf; + value_after_flush = ParsePackedValueForValue(value); + } + } + // Generate a rolling 64-bit hash of the key and values // Note : // Here "key" integrates 'sequence_number'+'kType'+'user key'. - s = output_validator.Add(key_after_flush, value); + s = output_validator.Add(key_after_flush, value_after_flush); if (!s.ok()) { break; } - builder->Add(key_after_flush, value); + builder->Add(key_after_flush, value_after_flush); - s = meta->UpdateBoundaries(key_after_flush, value, ikey.sequence, - ikey.type); + s = meta->UpdateBoundaries(key_after_flush, value_after_flush, + ikey.sequence, ikey.type); if (!s.ok()) { break; } diff --git a/db/compaction/compaction_iteration_stats.h b/db/compaction/compaction_iteration_stats.h index 1b1c28b57ad..8777cf62764 100644 --- a/db/compaction/compaction_iteration_stats.h +++ b/db/compaction/compaction_iteration_stats.h @@ -44,6 +44,13 @@ struct CompactionIterationStats { uint64_t total_blob_bytes_read = 0; uint64_t num_blobs_relocated = 0; uint64_t total_blob_bytes_relocated = 0; + + // TimedPut diagnostics + // Total number of kTypeValuePreferredSeqno records encountered. + uint64_t num_input_timed_put_records = 0; + // Number of kTypeValuePreferredSeqno records we ended up swapping in + // preferred seqno. + uint64_t num_timed_put_swap_preferred_seqno = 0; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 85d1c039bd3..ebda5a6ff77 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -491,6 +491,8 @@ void CompactionIterator::NextFromInput() { if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || ikey_.type == kTypeDeletionWithTimestamp) { iter_stats_.num_input_deletion_records++; + } else if (ikey_.type == kTypeValuePreferredSeqno) { + iter_stats_.num_input_timed_put_records++; } iter_stats_.total_input_raw_key_bytes += key_.size(); iter_stats_.total_input_raw_value_bytes += value_.size(); @@ -618,7 +620,8 @@ void CompactionIterator::NextFromInput() { // not compact out. We will keep this Put, but can drop it's data. // (See Optimization 3, below.) if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && - ikey_.type != kTypeWideColumnEntity) { + ikey_.type != kTypeWideColumnEntity && + ikey_.type != kTypeValuePreferredSeqno) { ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output", ikey_.DebugString(allow_data_in_errors_, true).c_str()); assert(false); @@ -632,7 +635,8 @@ void CompactionIterator::NextFromInput() { assert(false); } - if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) { + if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity || + ikey_.type == kTypeValuePreferredSeqno) { ikey_.type = kTypeValue; current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); } @@ -798,7 +802,8 @@ void CompactionIterator::NextFromInput() { // happened if (next_ikey.type != kTypeValue && next_ikey.type != kTypeBlobIndex && - next_ikey.type != kTypeWideColumnEntity) { + next_ikey.type != kTypeWideColumnEntity && + next_ikey.type != kTypeValuePreferredSeqno) { ++iter_stats_.num_single_del_mismatch; } @@ -968,6 +973,50 @@ void CompactionIterator::NextFromInput() { validity_info_.SetValid(ValidContext::kKeepDel); at_next_ = true; } + } else if (ikey_.type == kTypeValuePreferredSeqno && + DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && + (bottommost_level_ || + (compaction_ != nullptr && + compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, + &level_ptrs_)))) { + // This section that attempts to swap preferred sequence number will not + // be invoked if this is a CompactionIterator created for flush, since + // `compaction_` will be nullptr and it's not bottommost either. + // + // The entries with the same user key and smaller sequence numbers are + // all in this earliest snapshot range to be iterated. Since those entries + // will be hidden by this entry [rule A], it's safe to swap in the + // preferred seqno now. + // + // It's otherwise not safe to swap in the preferred seqno since it's + // possible for entries in earlier snapshots to have sequence number that + // is smaller than this entry's sequence number but bigger than this + // entry's preferred sequence number. Swapping in the preferred sequence + // number will break the internal key ordering invariant for this key. + // + // A special case involving range deletion is handled separately below. + auto [unpacked_value, preferred_seqno] = + ParsePackedValueWithSeqno(value_); + assert(preferred_seqno < ikey_.sequence); + InternalKey ikey_after_swap(ikey_.user_key, preferred_seqno, kTypeValue); + Slice ikey_after_swap_slice(*ikey_after_swap.rep()); + if (range_del_agg_->ShouldDelete( + ikey_after_swap_slice, + RangeDelPositioningMode::kForwardTraversal)) { + // A range tombstone that doesn't cover this kTypeValuePreferredSeqno + // entry may end up covering the entry, so it's not safe to swap + // preferred sequence number. In this case, we output the entry as is. + validity_info_.SetValid(ValidContext::kNewUserKey); + } else { + iter_stats_.num_timed_put_swap_preferred_seqno++; + ikey_.sequence = preferred_seqno; + ikey_.type = kTypeValue; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetInternalKey(); + ikey_.user_key = current_key_.GetUserKey(); + value_ = unpacked_value; + validity_info_.SetValid(ValidContext::kSwapPreferredSeqno); + } } else if (ikey_.type == kTypeMerge) { if (!merge_helper_->HasOperator()) { status_ = Status::InvalidArgument( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 1ff9c886924..eeb75efac4e 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -410,6 +410,7 @@ class CompactionIterator { kKeepDel = 9, kNewUserKey = 10, kRangeDeletion = 11, + kSwapPreferredSeqno = 12, }; struct ValidityInfo { diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 699e629693d..7558a3e5c75 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -17,6 +17,14 @@ #include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { +namespace { +std::string ValueWithPreferredSeqno(std::string val, + SequenceNumber preferred_seqno = 0) { + std::string result = val; + PutFixed64(&result, preferred_seqno); + return result; +} +} // namespace // Expects no merging attempts. class NoMergingMergeOp : public MergeOperator { @@ -392,6 +400,17 @@ TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) { ASSERT_FALSE(c_iter_->Valid()); } +// Tests compatibility of TimedPut and SingleDelete. TimedPut should act as if +// it's a Put. +TEST_P(CompactionIteratorTest, TimedPutAndSingleDelete) { + InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValuePreferredSeqno)}, + {"", "val"}, {}, {}, 5); + c_iter_->SeekToFirst(); + ASSERT_OK(c_iter_->status()); + ASSERT_FALSE(c_iter_->Valid()); +} + TEST_P(CompactionIteratorTest, SimpleRangeDeletion) { InitIterators({test::KeyStr("morning", 5, kTypeValue), test::KeyStr("morning", 2, kTypeValue), @@ -431,6 +450,31 @@ TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) { ASSERT_FALSE(c_iter_->Valid()); } +// Tests compatibility of TimedPut and Range delete. TimedPut should act as if +// it's a Put. +TEST_P(CompactionIteratorTest, TimedPutAndRangeDeletion) { + InitIterators( + {test::KeyStr("morning", 5, kTypeValuePreferredSeqno), + test::KeyStr("morning", 2, kTypeValuePreferredSeqno), + test::KeyStr("night", 3, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("zao5"), ValueWithPreferredSeqno("zao2"), + ValueWithPreferredSeqno("wan")}, + {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValuePreferredSeqno), + c_iter_->key().ToString()); + ASSERT_EQ(ValueWithPreferredSeqno("zao5"), c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 3, kTypeValuePreferredSeqno), + c_iter_->key().ToString()); + ASSERT_EQ(ValueWithPreferredSeqno("wan"), c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_OK(c_iter_->status()); + ASSERT_FALSE(c_iter_->Valid()); +} + TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) { class Filter : public CompactionFilter { Decision FilterV2(int /*level*/, const Slice& key, ValueType t, @@ -502,9 +546,11 @@ TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) { test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue), test::KeyStr("h", 91, kTypeValue), // keep test::KeyStr("i", 95, kTypeMerge), // skip to "z" - test::KeyStr("j", 99, kTypeValue)}, + test::KeyStr("j", 99, kTypeValue), + test::KeyStr("k", 100, kTypeValuePreferredSeqno)}, {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30", - "fv25", "gv90", "hv91", "im95", "jv99"}, + "fv25", "gv90", "hv91", "im95", "jv99", + ValueWithPreferredSeqno("kv100")}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter); // Compaction should output just "a", "e" and "h" keys. @@ -614,87 +660,87 @@ TEST_P(CompactionIteratorTest, ShuttingDownInMerge) { EXPECT_EQ(2, filter.last_seen.load()); } -TEST_P(CompactionIteratorTest, SingleMergeOperand) { - class Filter : public CompactionFilter { - Decision FilterV2(int /*level*/, const Slice& key, ValueType t, - const Slice& existing_value, std::string* /*new_value*/, - std::string* /*skip_until*/) const override { - std::string k = key.ToString(); - std::string v = existing_value.ToString(); - - // See InitIterators() call below for the sequence of keys and their - // filtering decisions. Here we closely assert that compaction filter is - // called with the expected keys and only them, and with the right values. - if (k == "a") { - EXPECT_EQ(ValueType::kMergeOperand, t); - EXPECT_EQ("av1", v); - return Decision::kKeep; - } else if (k == "b") { - EXPECT_EQ(ValueType::kMergeOperand, t); - return Decision::kKeep; - } else if (k == "c") { - return Decision::kKeep; - } - - ADD_FAILURE(); +class Filter : public CompactionFilter { + Decision FilterV2(int /*level*/, const Slice& key, ValueType t, + const Slice& existing_value, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + std::string k = key.ToString(); + std::string v = existing_value.ToString(); + + // See InitIterators() call below for the sequence of keys and their + // filtering decisions. Here we closely assert that compaction filter is + // called with the expected keys and only them, and with the right values. + if (k == "a") { + EXPECT_EQ(ValueType::kMergeOperand, t); + EXPECT_EQ("av1", v); + return Decision::kKeep; + } else if (k == "b") { + EXPECT_EQ(ValueType::kMergeOperand, t); + return Decision::kKeep; + } else if (k == "c") { return Decision::kKeep; } - const char* Name() const override { - return "CompactionIteratorTest.SingleMergeOperand::Filter"; - } - }; + ADD_FAILURE(); + return Decision::kKeep; + } - class SingleMergeOp : public MergeOperator { - public: - bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - // See InitIterators() call below for why "c" is the only key for which - // FullMergeV2 should be called. - EXPECT_EQ("c", merge_in.key.ToString()); - - std::string temp_value; - if (merge_in.existing_value != nullptr) { - temp_value = merge_in.existing_value->ToString(); - } + const char* Name() const override { + return "CompactionIteratorTest.SingleMergeOperand::Filter"; + } +}; - for (auto& operand : merge_in.operand_list) { - temp_value.append(operand.ToString()); - } - merge_out->new_value = temp_value; +class SingleMergeOp : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + // See InitIterators() call below for why "c" is the only key for which + // FullMergeV2 should be called. + EXPECT_EQ("c", merge_in.key.ToString()); + + std::string temp_value; + if (merge_in.existing_value != nullptr) { + temp_value = merge_in.existing_value->ToString(); + } - return true; + for (auto& operand : merge_in.operand_list) { + temp_value.append(operand.ToString()); } + merge_out->new_value = temp_value; - bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, - Logger* /*logger*/) const override { - std::string string_key = key.ToString(); - EXPECT_TRUE(string_key == "a" || string_key == "b"); - - if (string_key == "a") { - EXPECT_EQ(1, operand_list.size()); - } else if (string_key == "b") { - EXPECT_EQ(2, operand_list.size()); - } + return true; + } - std::string temp_value; - for (auto& operand : operand_list) { - temp_value.append(operand.ToString()); - } - swap(temp_value, *new_value); + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* /*logger*/) const override { + std::string string_key = key.ToString(); + EXPECT_TRUE(string_key == "a" || string_key == "b"); - return true; + if (string_key == "a") { + EXPECT_EQ(1, operand_list.size()); + } else if (string_key == "b") { + EXPECT_EQ(2, operand_list.size()); } - const char* Name() const override { - return "CompactionIteratorTest SingleMergeOp"; + std::string temp_value; + for (auto& operand : operand_list) { + temp_value.append(operand.ToString()); } + swap(temp_value, *new_value); - bool AllowSingleOperand() const override { return true; } - }; + return true; + } + + const char* Name() const override { + return "CompactionIteratorTest SingleMergeOp"; + } + bool AllowSingleOperand() const override { return true; } +}; + +TEST_P(CompactionIteratorTest, SingleMergeOperand) { SingleMergeOp merge_op; Filter filter; InitIterators( @@ -719,6 +765,24 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) { ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); } +// Tests compatibility of TimedPut and Merge operation. When a TimedPut is +// merged with some merge operand in compaction, it will become a regular Put +// and lose its preferred sequence number. +TEST_P(CompactionIteratorTest, TimedPutAndMerge) { + SingleMergeOp merge_op; + Filter filter; + InitIterators({test::KeyStr("c", 90, kTypeMerge), + test::KeyStr("c", 80, kTypeValuePreferredSeqno)}, + {"cv2", ValueWithPreferredSeqno("cv1")}, {}, {}, + kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter); + + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("c", 90, kTypeValue), c_iter_->key().ToString()); + ASSERT_OK(c_iter_->status()); + ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); +} + // In bottommost level, values earlier than earliest snapshot can be output // with sequence = 0. TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { @@ -963,6 +1027,22 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) { {"v4", "v3", "v1"}, 3 /*last_committed_seq*/); } +TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_TimedPut) { + AddSnapshot(2, 1); + RunTest({test::KeyStr("foo", 4, kTypeValuePreferredSeqno), + test::KeyStr("foo", 3, kTypeValuePreferredSeqno), + test::KeyStr("foo", 2, kTypeValuePreferredSeqno), + test::KeyStr("foo", 1, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("v4"), ValueWithPreferredSeqno("v3"), + ValueWithPreferredSeqno("v2"), ValueWithPreferredSeqno("v1")}, + {test::KeyStr("foo", 4, kTypeValuePreferredSeqno), + test::KeyStr("foo", 3, kTypeValuePreferredSeqno), + test::KeyStr("foo", 1, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("v4"), ValueWithPreferredSeqno("v3"), + ValueWithPreferredSeqno("v1")}, + 3 /*last_committed_seq*/); +} + TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) { AddSnapshot(2, 1); RunTest( @@ -1128,6 +1208,114 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, 2 /* earliest_write_conflict_snapshot */); } +// Same as above but with a value with preferred seqno entry. In addition to the +// value getting trimmed, the type of the KV is changed to kTypeValue. +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + KeepSingleDeletionForWriteConflictChecking_TimedPut) { + AddSnapshot(2, 0); + RunTest({test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValuePreferredSeqno)}, + {"", ValueWithPreferredSeqno("v1")}, + {test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", ""}, 2 /* last_committed_seq */, nullptr /* merge_operator */, + nullptr /* compaction_filter */, false /* bottommost_level */, + 2 /* earliest_write_conflict_snapshot */); +} + +// Tests when a kTypeValuePreferredSeqno entry can have its preferred sequence +// number swapped in. The required and sufficient conditions for an entry's +// preferred sequence number to get swapped in are: +// 1) The entry is visible to the earliest snapshot, AND +// 2) No more entries with the same user key on lower levels, AND +// This is either because: +// 2a) This is a compaction to the bottommost level, OR +// 2b) Keys do not exist beyond output level +// 3) The entry will not resurface a range deletion entry after swapping in the +// preferred sequence number. +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + TimedPut_NotVisibleToEarliestSnapshot_NoSwapPreferredSeqno) { + AddSnapshot(3); + RunTest({test::KeyStr("bar", 5, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("bv2", 2)}, + {test::KeyStr("bar", 5, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("bv2", 2), "bv1"}, 5 /*last_committed_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/, + kMaxSequenceNumber /*earliest_write_conflict_snapshot*/, + true /*key_not_exists_beyond_output_level*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + TimedPut_MoreEntriesInLowerLevels_NoSwapPreferredSeqno) { + // This tests mimics more entries in lower levels with `bottommost_level` and + // `key_not_exists_beyond_output_level` set to false. + RunTest({test::KeyStr("bar", 5, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("bv2", 2)}, + {test::KeyStr("bar", 5, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("bv2", 2)}, 5 /*last_committed_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + false /*bottommost_level*/, + kMaxSequenceNumber /*earliest_write_conflict_snapshot*/, + false /*key_not_exists_beyond_output_level*/); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + TimedPut_WillBeHiddenByRangeDeletionAfterSwap_NoSwap) { + InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno), + test::KeyStr("night", 6, kTypeValue)}, + {ValueWithPreferredSeqno("zao", 3), "wan"}, + {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 6, + kMaxSequenceNumber /*last_committed_sequence*/, + nullptr /*merge_op*/, nullptr /*filter*/, + false /*bottommost_level*/, + kMaxSequenceNumber /*earliest_write_conflict_snapshot*/, + true /*key_not_exists_beyond_output_level*/); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValuePreferredSeqno), + c_iter_->key().ToString()); + ASSERT_EQ(ValueWithPreferredSeqno("zao", 3), c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 6, kTypeValue), c_iter_->key().ToString()); + ASSERT_EQ("wan", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); + ASSERT_OK(c_iter_->status()); +} + +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + TimedPut_BottomMostLevelVisibleToEarliestSnapshot_SwapPreferredSeqno) { + // Preferred seqno got swapped in and also zeroed out as a bottommost level + // optimization. + RunTest( + {test::KeyStr("bar", 5, kTypeValuePreferredSeqno), + test::KeyStr("bar", 4, kTypeValuePreferredSeqno), + test::KeyStr("foo", 6, kTypeValue)}, + {ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1), + "fv1"}, + {test::KeyStr("bar", 0, kTypeValue), test::KeyStr("foo", 0, kTypeValue)}, + {"bv2", "fv1"}, 6 /*last_committed_seq*/, nullptr /*merge_operator*/, + nullptr /*compaction_filter*/, true /*bottommost_level*/); +} + +TEST_F( + CompactionIteratorWithSnapshotCheckerTest, + TimedPut_NonBottomMostLevelVisibleToEarliestSnapshot_SwapPreferredSeqno) { + RunTest( + {test::KeyStr("bar", 5, kTypeValuePreferredSeqno), + test::KeyStr("bar", 4, kTypeValuePreferredSeqno), + test::KeyStr("foo", 6, kTypeValue)}, + {ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1), + "fv1"}, + {test::KeyStr("bar", 2, kTypeValue), test::KeyStr("foo", 6, kTypeValue)}, + {"bv2", "fv1"}, 6 /*last_committed_seq*/, nullptr /*merge_operator*/, + nullptr /*compaction_filter*/, false /*bottommost_level*/, + kMaxSequenceNumber /*earliest_write_conflict_snapshot*/, + true /*key_not_exists_beyond_output_level*/); +} + // Compaction filter should keep uncommitted key as-is, and // * Convert the latest value to deletion, and/or // * if latest value is a merge, apply filter to all subsequent merges. @@ -1145,6 +1333,22 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) { nullptr /*merge_operator*/, compaction_filter.get()); } +TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_TimedPut) { + // TODO(yuzhangyu): Add support for this type in compaction filter. + // Type kTypeValuePreferredSeqno is not explicitly exposed in the compaction + // filter API, so users can not operate on it through compaction filter API + // to remove/purge/change value etc. But this type of entry can be impacted by + // other entries' filter result, currently only kRemoveAndSkip type of result + // can affect it. + std::unique_ptr compaction_filter( + new FilterAllKeysCompactionFilter()); + RunTest({test::KeyStr("a", 2, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("v1")}, + {test::KeyStr("a", 2, kTypeValuePreferredSeqno)}, + {ValueWithPreferredSeqno("v1")}, 2 /*last_committed_seq*/, + nullptr /*merge_operator*/, compaction_filter.get()); +} + TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) { std::unique_ptr compaction_filter( new FilterAllKeysCompactionFilter()); diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 27db8b96afa..cd4b6494870 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -1583,6 +1583,62 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) { Close(); } +TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 60; + options.preserve_internal_time_seconds = 0; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + options.last_level_temperature = Temperature::kCold; + DestroyAndReopen(options); + + Random rnd(301); + + dbfull()->TEST_WaitForPeriodicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(rnd.Uniform(10) + 1)); + }); + + for (int i = 0; i < kNumKeys / 2; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeriodicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(rnd.Uniform(2))); + }); + } + // Create one file with regular Put. + ASSERT_OK(Flush()); + + // Create one file with TimedPut. + // With above mock clock operations, write_unix_time 50 should be before + // current_time - preclude_last_level_seconds. + // These data are eligible to be put on the last level once written to db + // and compaction will fast track them to the last level. + for (int i = kNumKeys / 2; i < kNumKeys; i++) { + ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50)); + } + ASSERT_OK(Flush()); + + // TimedPut file moved to the last level immediately. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + + // Wait more than preclude_last_level time, Put file eventually moved to the + // last level. + mock_clock_->MockSleepForSeconds(100); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + Close(); +} + TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) { const int kNumTrigger = 4; const int kNumLevels = 7; diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 60eaa8486e8..dec2cfb9ca1 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -442,6 +442,41 @@ TEST_F(DBBasicTest, PutSingleDeleteGet) { kSkipMergePut)); } +TEST_F(DBBasicTest, TimedPutBasic) { + do { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(TimedPut(1, "foo", "v1", /*write_unix_time=*/0)); + // Read from memtable + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_OK(TimedPut(1, "foo", "v2.1", /*write_unix_time=*/3)); + ASSERT_EQ("v2.1", Get(1, "foo")); + + // Read from sst file + ASSERT_OK(db_->Flush(FlushOptions(), handles_[1])); + ASSERT_OK(Merge(1, "foo", "v2.2")); + ASSERT_EQ("v2.1,v2.2", Get(1, "foo")); + ASSERT_OK(Delete(1, "foo")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo")); + + ASSERT_OK(TimedPut(1, "bar", "bv1", /*write_unix_time=*/0)); + ASSERT_EQ("bv1", Get(1, "bar")); + ASSERT_OK(TimedPut(1, "baz", "bzv1", /*write_unix_time=*/0)); + ASSERT_EQ("bzv1", Get(1, "baz")); + std::string range_del_begin = "b"; + std::string range_del_end = "baz"; + Slice begin_rdel = range_del_begin, end_rdel = range_del_end; + ASSERT_OK( + db_->DeleteRange(WriteOptions(), handles_[1], begin_rdel, end_rdel)); + ASSERT_EQ("NOT_FOUND", Get(1, "bar")); + + ASSERT_EQ("bzv1", Get(1, "baz")); + ASSERT_OK(SingleDelete(1, "baz")); + ASSERT_EQ("NOT_FOUND", Get(1, "baz")); + } while (ChangeOptions(kSkipPlainTable)); +} + TEST_F(DBBasicTest, EmptyFlush) { // It is possible to produce empty flushes when using single deletes. Tests // whether empty flushes cause issues. diff --git a/db/db_iter.cc b/db/db_iter.cc index 1547ec0a2e7..90e19e95d2e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -392,6 +392,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, } break; case kTypeValue: + case kTypeValuePreferredSeqno: case kTypeBlobIndex: case kTypeWideColumnEntity: if (!PrepareValue()) { @@ -417,8 +418,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, return false; } } else { - assert(ikey_.type == kTypeValue); - SetValueAndColumnsFromPlain(iter_.value()); + assert(ikey_.type == kTypeValue || + ikey_.type == kTypeValuePreferredSeqno); + Slice value = iter_.value(); + if (ikey_.type == kTypeValuePreferredSeqno) { + value = ParsePackedValueForValue(value); + } + SetValueAndColumnsFromPlain(value); } valid_ = true; @@ -574,10 +580,14 @@ bool DBIter::MergeValuesNewToOld() { return false; } - if (kTypeValue == ikey.type) { - // hit a put, merge the put value with operands and store the - // final result in saved_value_. We are done! - if (!MergeWithPlainBaseValue(iter_.value(), ikey.user_key)) { + if (kTypeValue == ikey.type || kTypeValuePreferredSeqno == ikey.type) { + Slice value = iter_.value(); + if (kTypeValuePreferredSeqno == ikey.type) { + value = ParsePackedValueForValue(value); + } + // hit a put or put equivalent, merge the put value with operands and + // store the final result in saved_value_. We are done! + if (!MergeWithPlainBaseValue(value, ikey.user_key)) { return false; } // iter_ is positioned after put @@ -839,8 +849,8 @@ bool DBIter::FindValueForCurrentKey() { merge_context_.Clear(); current_entry_is_merged_ = false; // last entry before merge (could be kTypeDeletion, - // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue, - // kTypeBlobIndex, or kTypeWideColumnEntity) + // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue + // kTypeBlobIndex, kTypeWideColumnEntity or kTypeValuePreferredSeqno) ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; @@ -917,10 +927,15 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: + case kTypeValuePreferredSeqno: case kTypeBlobIndex: case kTypeWideColumnEntity: if (iter_.iter()->IsValuePinned()) { - pinned_value_ = iter_.value(); + if (last_key_entry_type == kTypeValuePreferredSeqno) { + pinned_value_ = ParsePackedValueForValue(iter_.value()); + } else { + pinned_value_ = iter_.value(); + } } else { valid_ = false; status_ = Status::NotSupported( @@ -1030,7 +1045,8 @@ bool DBIter::FindValueForCurrentKey() { return true; } else { - assert(last_not_merge_type == kTypeValue); + assert(last_not_merge_type == kTypeValue || + last_not_merge_type == kTypeValuePreferredSeqno); if (!MergeWithPlainBaseValue(pinned_value_, saved_key_.GetUserKey())) { return false; } @@ -1038,6 +1054,7 @@ bool DBIter::FindValueForCurrentKey() { } break; case kTypeValue: + case kTypeValuePreferredSeqno: SetValueAndColumnsFromPlain(pinned_value_); break; @@ -1142,10 +1159,14 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_); saved_timestamp_.assign(ts.data(), ts.size()); } - if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex || - ikey.type == kTypeWideColumnEntity) { + if (ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno || + ikey.type == kTypeBlobIndex || ikey.type == kTypeWideColumnEntity) { assert(iter_.iter()->IsValuePinned()); - pinned_value_ = iter_.value(); + if (ikey.type == kTypeValuePreferredSeqno) { + pinned_value_ = ParsePackedValueForValue(iter_.value()); + } else { + pinned_value_ = iter_.value(); + } if (ikey.type == kTypeBlobIndex) { if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) { return false; @@ -1158,7 +1179,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { return false; } } else { - assert(ikey.type == kTypeValue); + assert(ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno); SetValueAndColumnsFromPlain(pinned_value_); } @@ -1204,8 +1225,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { return false; } - if (ikey.type == kTypeValue) { - if (!MergeWithPlainBaseValue(iter_.value(), saved_key_.GetUserKey())) { + if (ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno) { + Slice value = iter_.value(); + if (ikey.type == kTypeValuePreferredSeqno) { + value = ParsePackedValueForValue(value); + } + if (!MergeWithPlainBaseValue(value, saved_key_.GetUserKey())) { return false; } return true; diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 81b73affc3b..cf8321808f9 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -45,6 +45,14 @@ class TestIterator : public InternalIterator { Add(argkey, kTypeValue, argvalue); } + void AddTimedPut(std::string argkey, std::string argvalue, + uint64_t write_unix_time) { + std::string packed_value; + [[maybe_unused]] auto packed_value_slice = + PackValueAndWriteTime(argvalue, write_unix_time, &packed_value); + Add(argkey, kTypeValuePreferredSeqno, packed_value); + } + void AddDeletion(std::string argkey) { Add(argkey, kTypeDeletion, std::string()); } @@ -1388,6 +1396,60 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) { } } +TEST_F(DBIteratorTest, DBIteratorTimedPutBasic) { + ReadOptions ro; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddTimedPut("a", "0", /*write_unix_time=*/0); + internal_iter->AddMerge("a", "1"); + internal_iter->AddTimedPut("b", "0", /*write_unix_time=*/0); + internal_iter->AddDeletion("b"); + internal_iter->AddTimedPut("c", "01", /*write_unix_time=*/0); + internal_iter->AddTimedPut("c", "02", /*write_unix_time=*/0); + internal_iter->AddTimedPut("c", "2", /*write_unix_time=*/0); + internal_iter->AddTimedPut("d", "3", /*write_unix_time=*/0); + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ro, ImmutableOptions(options), MutableCFOptions(options), + BytewiseComparator(), internal_iter, nullptr /* version */, + 7 /* sequence */, /*max_sequential_skip_in_iterations*/ 1, + nullptr /* read_callback */)); + db_iter->SeekToFirst(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "0,1"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "2"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "3"); + db_iter->Next(); + ASSERT_FALSE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "3"); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "2"); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "0,1"); + db_iter->Prev(); + ASSERT_FALSE(db_iter->Valid()); + ASSERT_OK(db_iter->status()); +} + TEST_F(DBIteratorTest, DBIterator1) { ReadOptions ro; Options options; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index b82dfa6ace7..efa14dce011 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -759,6 +759,19 @@ Status DBTestBase::Put(int cf, const Slice& k, const Slice& v, } } +Status DBTestBase::TimedPut(int cf, const Slice& k, const Slice& v, + uint64_t write_unix_time, WriteOptions wo) { + WriteBatch wb; + ColumnFamilyHandle* cfh; + if (cf != 0) { + cfh = handles_[cf]; + } else { + cfh = db_->DefaultColumnFamily(); + } + EXPECT_OK(wb.TimedPut(cfh, k, v, write_unix_time)); + return db_->Write(wo, &wb); +} + Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) { return db_->Merge(wo, k, v); } diff --git a/db/db_test_util.h b/db/db_test_util.h index e47be37d8e6..959f75d1012 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1176,6 +1176,9 @@ class DBTestBase : public testing::Test { Status Put(int cf, const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); + Status TimedPut(int cf, const Slice& k, const Slice& v, + uint64_t write_unix_time, WriteOptions wo = WriteOptions()); + Status Merge(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()); diff --git a/db/dbformat.cc b/db/dbformat.cc index 5f9db0bb033..2378ba488b9 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -25,7 +25,7 @@ namespace ROCKSDB_NAMESPACE { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeWideColumnEntity; +const ValueType kValueTypeForSeek = kTypeValuePreferredSeqno; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; const std::string kDisableUserTimestamp; diff --git a/db/dbformat.h b/db/dbformat.h index 4ae50997312..5b16726693e 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -68,7 +68,9 @@ enum ValueType : unsigned char { kTypeDeletionWithTimestamp = 0x14, kTypeCommitXIDAndTimestamp = 0x15, // WAL only kTypeWideColumnEntity = 0x16, - kTypeColumnFamilyWideColumnEntity = 0x17, // WAL only + kTypeColumnFamilyWideColumnEntity = 0x17, // WAL only + kTypeValuePreferredSeqno = 0x18, // Value with a unix write time + kTypeColumnFamilyValuePreferredSeqno = 0x19, // WAL only kTypeMaxValid, // Should be after the last valid type, only used for // validation kMaxValue = 0x7F // Not used for storing records. @@ -108,7 +110,8 @@ struct UserKeyRangePtr { // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { return t <= kTypeMerge || kTypeSingleDeletion == t || kTypeBlobIndex == t || - kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t; + kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t || + kTypeValuePreferredSeqno == t; } // Checks whether a type is from user operation @@ -909,7 +912,8 @@ bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record); // resulting from this call will include timestamp. Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob, Slice* xid); + Slice* value, Slice* blob, Slice* xid, + uint64_t* write_unix_time); // When user call DeleteRange() to delete a range of keys, // we will store a serialized RangeTombstone in MemTable and SST. diff --git a/db/flush_job.cc b/db/flush_job.cc index 085f368d490..9340ea18a21 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -850,6 +850,9 @@ Status FlushJob::WriteLevel0Table() { const uint64_t start_cpu_micros = clock_->CPUMicros(); Status s; + // TODO(yuzhangyu): extend the copied seqno to time mapping range here so + // it can try to cover the earliest write unix time as much as possible. We + // need this mapping to get a more precise preferred seqno. SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber(); if (!db_impl_seqno_to_time_mapping_.Empty()) { // make a local copy to use while not holding the db_mutex. diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index b864b328881..82edcc5e0c3 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -25,6 +25,19 @@ #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { +namespace { +std::string ValueWithWriteTime(std::string val, uint64_t write_time = 0) { + std::string result = val; + PutFixed64(&result, write_time); + return result; +} +std::string ValueWithPreferredSeqno(std::string val, + SequenceNumber preferred_seqno = 0) { + std::string result = val; + PutFixed64(&result, preferred_seqno); + return result; +} +} // namespace // TODO(icanadi) Mock out everything else: // 1. VersionSet @@ -608,6 +621,68 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { } } +TEST_F(FlushJobTest, ReplaceTimedPutWriteTimeWithPreferredSeqno) { + JobContext job_context(0); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + new_mem->Ref(); + SeqnoToTimeMapping seqno_to_time_mapping; + // Seqno: 10, 11, ... 20, + // Time: ... 500 ... 600 + // GetProximalSeqnoBeforeTime(500) -> 10 + // GetProximalSeqnoBeforeTime(600) -> 20 + seqno_to_time_mapping.Append(10, 500); + seqno_to_time_mapping.Append(20, 600); + + ASSERT_OK(new_mem->Add(SequenceNumber(15), kTypeValuePreferredSeqno, "bar", + ValueWithWriteTime("bval", 500), + nullptr /*kv_prot_info*/)); + ASSERT_OK(new_mem->Add(SequenceNumber(18), kTypeValuePreferredSeqno, "foo", + ValueWithWriteTime("fval", 600), + nullptr /*kv_prot_info*/)); + + auto inserted_entries = mock::MakeMockFile(); + InternalKey smallest_internal_key("bar", SequenceNumber(15), + kTypeValuePreferredSeqno); + inserted_entries.push_back({smallest_internal_key.Encode().ToString(), + ValueWithPreferredSeqno("bval", 10)}); + InternalKey largest_internal_key("foo", SequenceNumber(18), kTypeValue); + inserted_entries.push_back( + {largest_internal_key.Encode().ToString(), "fval"}); + autovector to_delete; + new_mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(new_mem, &to_delete); + for (auto& m : to_delete) { + delete m; + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relevant + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), + std::numeric_limits::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, FlushReason::kTest, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, seqno_to_time_mapping); + + FileMetaData file_meta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(nullptr, &file_meta)); + mutex_.Unlock(); + + ASSERT_EQ(smallest_internal_key.Encode().ToString(), + file_meta.smallest.Encode().ToString()); + ASSERT_EQ(largest_internal_key.Encode().ToString(), + file_meta.largest.Encode().ToString()); + mock_table_factory_->AssertSingleFile(inserted_entries); + job_context.Clean(); +} + // Test parameters: // param 0): paranoid file check // param 1): user-defined timestamp test mode diff --git a/db/memtable.cc b/db/memtable.cc index 872c176b9ec..bddba53f3a0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -983,7 +983,8 @@ static bool SaveValue(void* arg, const char* entry) { if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || type == kTypeWideColumnEntity || type == kTypeDeletion || - type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) && + type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp || + type == kTypeValuePreferredSeqno) && max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } @@ -1035,13 +1036,18 @@ static bool SaveValue(void* arg, const char* entry) { return false; } - case kTypeValue: { + case kTypeValue: + case kTypeValuePreferredSeqno: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + if (type == kTypeValuePreferredSeqno) { + v = ParsePackedValueForValue(v); + } + *(s->status) = Status::OK(); if (!s->do_merge) { diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index f32f6e1e073..bb4e44761ef 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -18,8 +18,17 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/string_util.h" +#include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { +namespace { +std::string ValueWithWriteTime(std::string value, uint64_t write_time) { + std::string result; + result = value; + PutFixed64(&result, write_time); + return result; +} +} // namespace class MemTableListTest : public testing::Test { public: @@ -255,6 +264,7 @@ TEST_F(MemTableListTest, GetTest) { InternalKeyComparator cmp(BytewiseComparator()); auto factory = std::make_shared(); options.memtable_factory = factory; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); ImmutableOptions ioptions(options); WriteBufferManager wb(options.db_write_buffer_size); @@ -271,6 +281,9 @@ TEST_F(MemTableListTest, GetTest) { nullptr /* kv_prot_info */)); ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2", nullptr /* kv_prot_info */)); + ASSERT_OK(mem->Add(++seq, kTypeValuePreferredSeqno, "key3", + ValueWithWriteTime("value3.1", 20), + nullptr /* kv_prot_info */)); // Fetch the newly written keys merge_context.Clear(); @@ -297,7 +310,15 @@ TEST_F(MemTableListTest, GetTest) { ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.2"); - ASSERT_EQ(4, mem->num_entries()); + merge_context.Clear(); + found = mem->Get(LookupKey("key3", seq), &value, /*columns*/ nullptr, + /*timestamp*/ nullptr, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions(), + false /* immutable_memtable */); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value3.1"); + + ASSERT_EQ(5, mem->num_entries()); ASSERT_EQ(1, mem->num_deletes()); // Add memtable to list @@ -318,6 +339,8 @@ TEST_F(MemTableListTest, GetTest) { mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */)); ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3", nullptr /* kv_prot_info */)); + ASSERT_OK(mem2->Add(++seq, kTypeMerge, "key3", "value3.2", + nullptr /* kv_prot_info */)); // Add second memtable to list // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed()) @@ -355,6 +378,14 @@ TEST_F(MemTableListTest, GetTest) { &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr, + /*timestamp=*/nullptr, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value3.1,value3.2"); + ASSERT_EQ(2, list.NumNotFlushed()); list.current()->Unref(&to_delete); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 703909010eb..2576aae840d 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -391,6 +391,18 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, stats_, clock_, /* update_num_ops_stats */ false, &op_failure_scope, &merge_result, /* result_operand */ nullptr, &merge_result_type); + } else if (ikey.type == kTypeValuePreferredSeqno) { + // When a TimedPut is merged with some merge operands, its original + // write time info is obsolete and removed, and the merge result is a + // kTypeValue. + Slice unpacked_value = ParsePackedValueForValue(iter->value()); + s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue, + unpacked_value, merge_context_.GetOperands(), + logger_, stats_, clock_, + /* update_num_ops_stats */ false, &op_failure_scope, + &merge_result, + /* result_operand */ nullptr, &merge_result_type); + } else if (ikey.type == kTypeBlobIndex) { BlobIndex blob_index; diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index 38aa645a88b..f08fb3a29f3 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -1620,6 +1620,27 @@ TEST_F(SeqnoTimeTest, EncodeDecodeMinimizeTimeGaps) { ASSERT_EQ(expected, seqs); } +TEST(PackValueAndSeqnoTest, Basic) { + std::string packed_value_buf; + Slice packed_value_slice = + PackValueAndWriteTime("foo", 30u, &packed_value_buf); + auto [unpacked_value, write_time] = + ParsePackedValueWithWriteTime(packed_value_slice); + ASSERT_EQ(unpacked_value, "foo"); + ASSERT_EQ(write_time, 30u); + ASSERT_EQ(ParsePackedValueForValue(packed_value_slice), "foo"); +} + +TEST(PackValueAndWriteTimeTest, Basic) { + std::string packed_value_buf; + Slice packed_value_slice = PackValueAndSeqno("foo", 30u, &packed_value_buf); + auto [unpacked_value, write_time] = + ParsePackedValueWithSeqno(packed_value_slice); + ASSERT_EQ(unpacked_value, "foo"); + ASSERT_EQ(write_time, 30u); + ASSERT_EQ(ParsePackedValueForValue(packed_value_slice), "foo"); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index ec547ff2a6e..07f28906b06 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -490,4 +490,45 @@ std::string SeqnoToTimeMapping::ToHumanString() const { return ret; } +Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time, + std::string* buf) { + buf->assign(value.data(), value.size()); + PutFixed64(buf, unix_write_time); + return Slice(*buf); +} + +Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno, + std::string* buf) { + buf->assign(value.data(), value.size()); + PutFixed64(buf, seqno); + return Slice(*buf); +} + +std::tuple ParsePackedValueWithWriteTime(const Slice& value) { + assert(value.size() >= sizeof(uint64_t)); + Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t), + sizeof(uint64_t)); + uint64_t write_time; + [[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time); + assert(res); + return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)), + write_time); +} + +std::tuple ParsePackedValueWithSeqno( + const Slice& value) { + assert(value.size() >= sizeof(SequenceNumber)); + Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t), + sizeof(uint64_t)); + SequenceNumber seqno; + [[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno); + assert(res); + return std::make_tuple( + Slice(value.data(), value.size() - sizeof(SequenceNumber)), seqno); +} + +Slice ParsePackedValueForValue(const Slice& value) { + assert(value.size() >= sizeof(uint64_t)); + return Slice(value.data(), value.size() - sizeof(uint64_t)); +} } // namespace ROCKSDB_NAMESPACE diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index 0332d2f90b0..2aa7bc2aa85 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -265,4 +265,28 @@ class SeqnoToTimeMapping { pair_const_iterator FindGreaterEqSeqno(SequenceNumber seqno) const; }; +// === Utility methods used for TimedPut === // + +// Pack a value Slice and a unix write time into buffer `buf` and return a Slice +// for the packed value backed by `buf`. +Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time, + std::string* buf); + +// Pack a value Slice and a sequence number into buffer `buf` and return a Slice +// for the packed value backed by `buf`. +Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno, + std::string* buf); + +// Parse a packed value to get the value and the write time. The unpacked value +// Slice is backed up by the same memory backing up `value`. +std::tuple ParsePackedValueWithWriteTime(const Slice& value); + +// Parse a packed value to get the value and the sequence number. The unpacked +// value Slice is backed up by the same memory backing up `value`. +std::tuple ParsePackedValueWithSeqno(const Slice& value); + +// Parse a packed value to get the value. The unpacked value Slice is backed up +// by the same memory backing up `value`. +Slice ParsePackedValueForValue(const Slice& value); + } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_batch.cc b/db/write_batch.cc index 26f9b7d85d2..05a592e94a9 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -90,6 +90,7 @@ enum ContentFlags : uint32_t { HAS_BLOB_INDEX = 1 << 10, HAS_BEGIN_UNPREPARE = 1 << 11, HAS_PUT_ENTITY = 1 << 12, + HAS_TIMED_PUT = 1 << 13, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -100,6 +101,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status TimedPutCF(uint32_t, const Slice&, const Slice&, uint64_t) override { + content_flags |= ContentFlags::HAS_TIMED_PUT; + return Status::OK(); + } + Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */, const Slice& /* entity */) override { content_flags |= ContentFlags::HAS_PUT_ENTITY; @@ -305,6 +311,10 @@ bool WriteBatch::HasPut() const { return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; } +bool WriteBatch::HasTimedPut() const { + return (ComputeContentFlags() & ContentFlags::HAS_TIMED_PUT) != 0; +} + bool WriteBatch::HasPutEntity() const { return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0; } @@ -360,7 +370,8 @@ bool WriteBatch::HasRollback() const { Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob, Slice* xid) { + Slice* value, Slice* blob, Slice* xid, + uint64_t* write_unix_time) { assert(key != nullptr && value != nullptr); *tag = (*input)[0]; input->remove_prefix(1); @@ -468,6 +479,18 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch PutEntity"); } break; + case kTypeColumnFamilyValuePreferredSeqno: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch TimedPut"); + } + FALLTHROUGH_INTENDED; + case kTypeValuePreferredSeqno: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value) || + !GetFixed64(input, write_unix_time)) { + return Status::Corruption("bad WriteBatch TimedPut"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -495,6 +518,7 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); Slice key, value, blob, xid; + uint64_t write_unix_time = 0; // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as // the batch boundary symbols otherwise we would mis-count the number of @@ -519,7 +543,7 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, column_family = 0; // default s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, - &blob, &xid); + &blob, &xid, &write_unix_time); if (!s.ok()) { return s; } @@ -705,6 +729,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, ++found; } break; + case kTypeValuePreferredSeqno: + case kTypeColumnFamilyValuePreferredSeqno: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_TIMED_PUT)); + s = handler->TimedPutCF(column_family, key, value, write_unix_time); + if (LIKELY(s.ok())) { + empty_batch = false; + ++found; + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -828,6 +862,46 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, return save.commit(); } +Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value, + uint64_t write_unix_time) { + if (key.size() > size_t{std::numeric_limits::max()}) { + return Status::InvalidArgument("key is too large"); + } + if (value.size() > size_t{std::numeric_limits::max()}) { + return Status::InvalidArgument("value is too large"); + } + LocalSavePoint save(b); + + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeValuePreferredSeqno)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyValuePreferredSeqno)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + // For a kTypeValuePreferredSeqno entry, its write time is encoded separately + // from value in an encoded WriteBatch. They are packed into one value Slice + // once it's written to the database. + PutFixed64(&b->rep_, write_unix_time); + + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_TIMED_PUT, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in other internal functions for why we don't need to + // differentiate between `kTypeValuePreferredSeqno` and + // `kTypeColumnFamilyValuePreferredSeqno` here. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, value, kTypeValuePreferredSeqno) + .ProtectC(column_family_id)); + } + return save.commit(); +} + Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { size_t ts_sz = 0; @@ -854,6 +928,26 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, SliceParts(&value, 1)); } +Status WriteBatch::TimedPut(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t write_unix_time) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } else if (ts_sz != 0) { + return Status::NotSupported( + "TimedPut is not supported in combination with user-defined " + "timestamps."); + } + return WriteBatchInternal::TimedPut(this, cf_id, key, value, write_unix_time); +} + Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& value) { const Status s = CheckColumnFamilyTimestampSize(column_family, ts); @@ -1682,6 +1776,7 @@ Status WriteBatch::VerifyChecksum() const { Slice input(rep_.data() + WriteBatchInternal::kHeader, rep_.size() - WriteBatchInternal::kHeader); Slice key, value, blob, xid; + uint64_t unix_write_time = 0; char tag = 0; uint32_t column_family = 0; // default Status s; @@ -1694,7 +1789,7 @@ Status WriteBatch::VerifyChecksum() const { value.clear(); column_family = 0; s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, - &blob, &xid); + &blob, &xid, &unix_write_time); if (!s.ok()) { return s; } @@ -1744,6 +1839,10 @@ Status WriteBatch::VerifyChecksum() const { case kTypeWideColumnEntity: tag = kTypeWideColumnEntity; break; + case kTypeColumnFamilyValuePreferredSeqno: + case kTypeValuePreferredSeqno: + tag = kTypeValuePreferredSeqno; + break; default: return Status::Corruption( "unknown WriteBatch tag", @@ -2185,6 +2284,34 @@ class MemTableInserter : public WriteBatch::Handler { return ret_status; } + Status TimedPutCF(uint32_t column_family_id, const Slice& key, + const Slice& value, uint64_t unix_write_time) override { + const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; + std::string value_buf; + Slice packed_value = + PackValueAndWriteTime(value, unix_write_time, &value_buf); + if (kv_prot_info != nullptr) { + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + ret_status = PutCFImpl(column_family_id, key, packed_value, + kTypeValuePreferredSeqno, &mem_kv_prot_info); + } else { + ret_status = + PutCFImpl(column_family_id, key, packed_value, + kTypeValuePreferredSeqno, nullptr /* kv_prot_info */); + } + + // TODO: this assumes that if TryAgain status is returned to the caller, + // The operation is actually tried again. The proper way to do this is to + // pass a `try_again` parameter to the operation itself and decrement + // prot_info_idx_ based on that. + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + Status PutEntityCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { const auto* kv_prot_info = NextProtectionInfo(); @@ -3029,6 +3156,11 @@ class ProtectionInfoUpdater : public WriteBatch::Handler { return UpdateProtInfo(cf, key, val, kTypeValue); } + Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& val, + uint64_t /*unix_write_time*/) override { + return UpdateProtInfo(cf, key, val, kTypeValuePreferredSeqno); + } + Status PutEntityCF(uint32_t cf, const Slice& key, const Slice& entity) override { return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 52bbe4545b4..e3388d2fc59 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -87,6 +87,10 @@ class WriteBatchInternal { static Status Put(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static Status TimedPut(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value, + uint64_t unix_write_time); + static Status PutEntity(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const WideColumns& columns); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 00faea4ce46..623adb76dd7 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -48,6 +48,7 @@ static std::string PrintContents(WriteBatch* b, WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr, nullptr); uint32_t count = 0; int put_count = 0; + int timed_put_count = 0; int delete_count = 0; int single_delete_count = 0; int delete_range_count = 0; @@ -116,6 +117,20 @@ static std::string PrintContents(WriteBatch* b, count++; merge_count++; break; + case kTypeValuePreferredSeqno: { + state.append("TimedPut("); + state.append(ikey.user_key.ToString()); + state.append(", "); + auto [unpacked_value, unix_write_time] = + ParsePackedValueWithWriteTime(iter->value()); + state.append(unpacked_value.ToString()); + state.append(", "); + state.append(std::to_string(unix_write_time)); + state.append(")"); + count++; + timed_put_count++; + break; + } default: assert(false); break; @@ -127,6 +142,7 @@ static std::string PrintContents(WriteBatch* b, } if (s.ok()) { EXPECT_EQ(b->HasPut(), put_count > 0); + EXPECT_EQ(b->HasTimedPut(), timed_put_count > 0); EXPECT_EQ(b->HasDelete(), delete_count > 0); EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0); EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0); @@ -278,6 +294,18 @@ struct TestHandler : public WriteBatch::Handler { } return Status::OK(); } + Status TimedPutCF(uint32_t column_family_id, const Slice& key, + const Slice& value, uint64_t unix_write_time) override { + if (column_family_id == 0) { + seen += "TimedPut(" + key.ToString() + ", " + value.ToString() + ", " + + std::to_string(unix_write_time) + ")"; + } else { + seen += "TimedPutCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ", " + value.ToString() + ", " + + std::to_string(unix_write_time) + ")"; + } + return Status::OK(); + } Status PutEntityCF(uint32_t column_family_id, const Slice& key, const Slice& entity) override { std::ostringstream oss; @@ -374,6 +402,17 @@ TEST_F(WriteBatchTest, PutNotImplemented) { ASSERT_OK(batch.Iterate(&handler)); } +TEST_F(WriteBatchTest, TimedPutNotImplemented) { + WriteBatch batch; + ASSERT_OK( + batch.TimedPut(0, Slice("k1"), Slice("v1"), /*unix_write_time=*/30)); + ASSERT_EQ(1u, batch.Count()); + ASSERT_EQ("TimedPut(k1, v1, 30)@0", PrintContents(&batch)); + + WriteBatch::Handler handler; + ASSERT_TRUE(batch.Iterate(&handler).IsInvalidArgument()); +} + TEST_F(WriteBatchTest, DeleteNotImplemented) { WriteBatch batch; ASSERT_OK(batch.Delete(Slice("k2"))); @@ -770,9 +809,8 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) { ASSERT_OK(batch.Merge(&three, Slice("threethree"), Slice("3three"))); ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar"))); ASSERT_OK(batch.Merge(Slice("omom"), Slice("nom"))); - // TODO(yuzhangyu): implement this. - ASSERT_TRUE( - batch.TimedPut(&zero, Slice("foo"), Slice("bar"), 0u).IsNotSupported()); + ASSERT_OK(batch.TimedPut(&zero, Slice("foo"), Slice("bar"), + /*write_unix_time*/ 0u)); TestHandler handler; ASSERT_OK(batch.Iterate(&handler)); @@ -785,7 +823,8 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) { "DeleteRangeCF(2, 3foo, 4foo)" "MergeCF(3, threethree, 3three)" "Put(foo, bar)" - "Merge(omom, nom)", + "Merge(omom, nom)" + "TimedPut(foo, bar, 0)", handler.seen); } diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 5c87f940581..dfdd2834bf9 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -101,15 +101,12 @@ class WriteBatch : public WriteBatchBase { } using WriteBatchBase::TimedPut; - // DO NOT USE, UNDER CONSTRUCTION + // EXPERIMENTAL // Stores the mapping "key->value" in the database with the specified write - // time in the column family. - Status TimedPut(ColumnFamilyHandle* /* column_family */, - const Slice& /* key */, const Slice& /* value */, - uint64_t /* write_unix_time */) override { - // TODO(yuzhangyu): implement take in the write time. - return Status::NotSupported("TimedPut is under construction"); - } + // time in the column family. Also see documentation in + // `WriteBatchBase::TimedPut` for the API's usage and limitations. + Status TimedPut(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t write_unix_time) override; // Store the mapping "key->{column1:value1, column2:value2, ...}" in the // column family specified by "column_family". @@ -259,6 +256,13 @@ class WriteBatch : public WriteBatchBase { // If user-defined timestamp is enabled, then `key` includes timestamp. virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {} + // If user-defined timestamp is enabled, then `key` includes timestamp. + virtual Status TimedPutCF(uint32_t /*column_family_id*/, + const Slice& /*key*/, const Slice& /*value*/, + uint64_t /*write_time*/) { + return Status::InvalidArgument("TimedPutCF not implemented"); + } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */, @@ -384,6 +388,9 @@ class WriteBatch : public WriteBatchBase { // Returns true if PutCF will be called during Iterate bool HasPut() const; + // Returns true if TimedPutCF will be called during Iterate + bool HasTimedPut() const; + // Returns true if PutEntityCF will be called during Iterate bool HasPutEntity() const; diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index 5b26ee543b5..2bf6a3c424a 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -42,6 +42,7 @@ class WriteBatchBase { const SliceParts& value); virtual Status Put(const SliceParts& key, const SliceParts& value); + // EXPERIMENTAL // Store the mapping "key->value" in the database with the specified write // time in the column family. Using some write time that is in the past to // fast track data to their correct placement and preservation is the intended @@ -49,10 +50,9 @@ class WriteBatchBase { // as having the given write time for this purpose but doesn't currently make // any guarantees. // - // When a regular Put("foo", "v1") is followed by a - // TimedPut("foo", "v2", some_time_before_first_put), the behavior of read - // queries are undefined and can change over time, for example due to - // compactions. + // This feature is experimental and one known side effect is that it can break + // snapshot immutability. Reading from a snapshot created before + // TimedPut(k, v, t) may or may not see that k->v. // Note: this feature is currently not compatible with user-defined timestamps // and wide columns. virtual Status TimedPut(ColumnFamilyHandle* column_family, const Slice& key, diff --git a/table/block_based/block.cc b/table/block_based/block.cc index bc18dd926da..ea4d559a2a4 100644 --- a/table/block_based/block.cc +++ b/table/block_based/block.cc @@ -335,11 +335,12 @@ void MetaBlockIter::SeekImpl(const Slice& target) { // target = "seek_user_key @ type | seqno". // // For any type other than kTypeValue, kTypeDeletion, kTypeSingleDeletion, -// kTypeBlobIndex, kTypeWideColumnEntity or kTypeMerge, this function behaves -// identically to Seek(). +// kTypeBlobIndex, kTypeWideColumnEntity, kTypeValuePreferredSeqno or +// kTypeMerge, this function behaves identically to Seek(). // // For any type in kTypeValue, kTypeDeletion, kTypeSingleDeletion, -// kTypeBlobIndex, kTypeWideColumnEntity, or kTypeMerge: +// kTypeBlobIndex, kTypeWideColumnEntity, kTypeValuePreferredSeqno or +// kTypeMerge: // // If the return value is FALSE, iter location is undefined, and it means: // 1) there is no key in this block falling into the range: @@ -452,7 +453,8 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) { value_type != ValueType::kTypeMerge && value_type != ValueType::kTypeSingleDeletion && value_type != ValueType::kTypeBlobIndex && - value_type != ValueType::kTypeWideColumnEntity) { + value_type != ValueType::kTypeWideColumnEntity && + value_type != ValueType::kTypeValuePreferredSeqno) { SeekImpl(target); } diff --git a/table/get_context.cc b/table/get_context.cc index 7dafbd7d409..9bb43783616 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -276,8 +276,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, appendToReplayLog(parsed_key.type, value, ts); auto type = parsed_key.type; + Slice unpacked_value = value; // Key matches. Process it - if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || + if ((type == kTypeValue || type == kTypeValuePreferredSeqno || + type == kTypeMerge || type == kTypeBlobIndex || type == kTypeWideColumnEntity || type == kTypeDeletion || type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) && max_covering_tombstone_seq_ != nullptr && @@ -289,9 +291,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } switch (type) { case kTypeValue: + case kTypeValuePreferredSeqno: case kTypeBlobIndex: case kTypeWideColumnEntity: assert(state_ == kNotFound || state_ == kMerge); + if (type == kTypeValuePreferredSeqno) { + unpacked_value = ParsePackedValueForValue(value); + } if (type == kTypeBlobIndex) { if (is_blob_index_ == nullptr) { // Blob value not supported. Stop. @@ -311,10 +317,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, ukey_with_ts_found_.PinSelf(parsed_key.user_key); } if (LIKELY(pinnable_val_ != nullptr)) { - Slice value_to_use = value; + Slice value_to_use = unpacked_value; if (type == kTypeWideColumnEntity) { - Slice value_copy = value; + Slice value_copy = unpacked_value; if (!WideColumnSerialization::GetValueOfDefaultColumn( value_copy, value_to_use) @@ -335,12 +341,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } else if (columns_ != nullptr) { if (type == kTypeWideColumnEntity) { - if (!columns_->SetWideColumnValue(value, value_pinner).ok()) { + if (!columns_->SetWideColumnValue(unpacked_value, value_pinner) + .ok()) { state_ = kCorrupt; return false; } } else { - columns_->SetPlainValue(value, value_pinner); + columns_->SetPlainValue(unpacked_value, value_pinner); } } } else { @@ -349,13 +356,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // merge_context_->operand_list if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(parsed_key.user_key, value, &pin_val) == false) { + if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) == + false) { return false; } Slice blob_value(pin_val); push_operand(blob_value, nullptr); } else if (type == kTypeWideColumnEntity) { - Slice value_copy = value; + Slice value_copy = unpacked_value; Slice value_of_default; if (!WideColumnSerialization::GetValueOfDefaultColumn( @@ -367,15 +375,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, push_operand(value_of_default, value_pinner); } else { - assert(type == kTypeValue); - push_operand(value, value_pinner); + assert(type == kTypeValue || type == kTypeValuePreferredSeqno); + push_operand(unpacked_value, value_pinner); } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(parsed_key.user_key, value, &pin_val) == false) { + if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) == + false) { return false; } Slice blob_value(pin_val); @@ -392,12 +401,12 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kFound; if (do_merge_) { - MergeWithWideColumnBaseValue(value); + MergeWithWideColumnBaseValue(unpacked_value); } else { // It means this function is called as part of DB GetMergeOperands // API and the current value should be part of // merge_context_->operand_list - Slice value_copy = value; + Slice value_copy = unpacked_value; Slice value_of_default; if (!WideColumnSerialization::GetValueOfDefaultColumn( @@ -410,16 +419,16 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, push_operand(value_of_default, value_pinner); } } else { - assert(type == kTypeValue); + assert(type == kTypeValue || type == kTypeValuePreferredSeqno); state_ = kFound; if (do_merge_) { - MergeWithPlainBaseValue(value); + MergeWithPlainBaseValue(unpacked_value); } else { // It means this function is called as part of DB GetMergeOperands // API and the current value should be part of // merge_context_->operand_list - push_operand(value, value_pinner); + push_operand(unpacked_value, value_pinner); } } } diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index ebc44a20fd1..d201163808d 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -527,6 +527,13 @@ Status SstFileDumper::ReadSequential(bool print_kv, uint64_t read_num_limit, fprintf(stdout, "%s => %s\n", ikey.DebugString(true, output_hex_).c_str(), oss.str().c_str()); + } else if (ikey.type == kTypeValuePreferredSeqno) { + auto [unpacked_value, preferred_seqno] = + ParsePackedValueWithSeqno(value); + fprintf(stdout, "%s => %s, %llu\n", + ikey.DebugString(true, output_hex_).c_str(), + unpacked_value.ToString(output_hex_).c_str(), + static_cast(preferred_seqno)); } else { fprintf(stdout, "%s => %s\n", ikey.DebugString(true, output_hex_).c_str(), diff --git a/util/write_batch_util.h b/util/write_batch_util.h index 70bbad9fc78..6986d25d0d0 100644 --- a/util/write_batch_util.h +++ b/util/write_batch_util.h @@ -32,6 +32,11 @@ class ColumnFamilyCollector : public WriteBatch::Handler { return AddColumnFamilyId(column_family_id); } + Status TimedPutCF(uint32_t column_family_id, const Slice&, const Slice&, + uint64_t) override { + return AddColumnFamilyId(column_family_id); + } + Status DeleteCF(uint32_t column_family_id, const Slice&) override { return AddColumnFamilyId(column_family_id); } diff --git a/utilities/debug.cc b/utilities/debug.cc index 911bc510a6a..c7599797dad 100644 --- a/utilities/debug.cc +++ b/utilities/debug.cc @@ -38,7 +38,11 @@ static std::unordered_map value_type_string_map = { {"TypeCommitXIDAndTimestamp", ValueType::kTypeCommitXIDAndTimestamp}, {"TypeWideColumnEntity", ValueType::kTypeWideColumnEntity}, {"TypeColumnFamilyWideColumnEntity", - ValueType::kTypeColumnFamilyWideColumnEntity}}; + ValueType::kTypeColumnFamilyWideColumnEntity}, + {"TypeValuePreferredSeqno", ValueType::kTypeValuePreferredSeqno}, + {"TypeColumnFamilyValuePreferredSeqno", + ValueType::kTypeColumnFamilyValuePreferredSeqno}, +}; std::string KeyVersion::GetTypeName() const { std::string type_name; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index bc4089e8913..0011401900e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -203,13 +203,14 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { while (s.ok() && !input.empty()) { Slice key, value, blob, xid; uint32_t column_family_id = 0; // default + uint64_t unix_write_time = 0; char tag = 0; // set offset of current entry for call to AddNewEntry() last_entry_offset = input.data() - write_batch.Data().data(); s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value, - &blob, &xid); + &blob, &xid, &unix_write_time); if (!s.ok()) { break; } @@ -263,6 +264,12 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { AddNewEntry(column_family_id); } break; + case kTypeColumnFamilyValuePreferredSeqno: + case kTypeValuePreferredSeqno: + // TimedPut is not supported in Transaction APIs. + return Status::Corruption( + "unexpected WriteBatch tag in ReBuildIndex", + std::to_string(static_cast(tag))); default: return Status::Corruption( "unknown WriteBatch tag in ReBuildIndex", diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 8789cc9b18e..2855f5b696e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -553,9 +553,10 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, } Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); char tag; - uint32_t column_family; + uint32_t column_family = 0; // default + uint64_t unix_write_time = 0; Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, key, value, - blob, xid); + blob, xid, &unix_write_time); if (!s.ok()) { return s; } @@ -598,6 +599,11 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, *type = kPutEntityRecord; break; } + case kTypeColumnFamilyValuePreferredSeqno: + case kTypeValuePreferredSeqno: + // TimedPut is not supported in Transaction APIs. + return Status::Corruption("unexpected WriteBatch tag ", + std::to_string(static_cast(tag))); default: return Status::Corruption("unknown WriteBatch tag ", std::to_string(static_cast(tag)));