Skip to content

Commit

Permalink
Add initial support for TimedPut API (facebook#12419)
Browse files Browse the repository at this point in the history
Summary:
This PR adds support for `TimedPut` API. We introduced a new type `kTypeValuePreferredSeqno` for entries added to the DB via the `TimedPut` API.

The life cycle of such an entry on the write/flush/compaction paths are:

1) It is initially added to memtable as:
`<user_key, seq, kTypeValuePreferredSeqno>: {value, write_unix_time}`

2) When it's flushed to L0 sst files, it's converted to:
`<user_key, seq, kTypeValuePreferredSeqno>: {value, preferred_seqno}`
 when we have easy access to the seqno to time mapping.

3) During compaction, if certain conditions are met, we swap in the `preferred_seqno` and the entry will become:
`<user_key, preferred_seqno, kTypeValue>: value`. This step helps fast track these entries to the cold tier if they are eligible after the sequence number swap.

On the read path:
A `kTypeValuePreferredSeqno` entry acts the same as a `kTypeValue` entry, the unix_write_time/preferred seqno part packed in value is completely ignored.

Needed follow ups:
1) The seqno to time mapping accessible in flush needs to be extended to cover the `write_unix_time` for possible `kTypeValuePreferredSeqno` entries. This also means we need to track these `write_unix_time` in memtable.

2) Compaction filter support for the new `kTypeValuePreferredSeqno` type for feature parity with other `kTypeValue` and equivalent types.

3) Stress test coverage for the feature

Pull Request resolved: facebook#12419

Test Plan: Added unit tests

Reviewed By: pdillinger

Differential Revision: D54920296

Pulled By: jowlyzhang

fbshipit-source-id: c8b43f7a7c465e569141770e93c748371ff1da9e
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Mar 14, 2024
1 parent f77b788 commit 1104eaa
Show file tree
Hide file tree
Showing 33 changed files with 1,057 additions and 142 deletions.
33 changes: 27 additions & 6 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,16 @@ Status BuildTable(
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

std::string key_after_flush_buf;
std::string value_buf;
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
Slice key_after_flush = key;
ParsedInternalKey ikey = c_iter.ikey();
key_after_flush_buf.assign(key.data(), key.size());
Slice key_after_flush = key_after_flush_buf;
Slice value_after_flush = value;

// If user defined timestamps will be stripped from user key after flush,
// the in memory version of the key act logically the same as one with a
// minimum timestamp. We update the timestamp here so file boundary and
Expand All @@ -227,17 +231,34 @@ Status BuildTable(
key_after_flush = key_after_flush_buf;
}

if (ikey.type == kTypeValuePreferredSeqno) {
auto [unpacked_value, unix_write_time] =
ParsePackedValueWithWriteTime(value);
SequenceNumber preferred_seqno =
seqno_to_time_mapping.GetProximalSeqnoBeforeTime(unix_write_time);
if (preferred_seqno < ikey.sequence) {
value_after_flush =
PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf);
} else {
// Cannot get a useful preferred seqno, convert it to a kTypeValue.
UpdateInternalKey(&key_after_flush_buf, ikey.sequence, kTypeValue);
ikey = ParsedInternalKey(ikey.user_key, ikey.sequence, kTypeValue);
key_after_flush = key_after_flush_buf;
value_after_flush = ParsePackedValueForValue(value);
}
}

// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key_after_flush, value);
s = output_validator.Add(key_after_flush, value_after_flush);
if (!s.ok()) {
break;
}
builder->Add(key_after_flush, value);
builder->Add(key_after_flush, value_after_flush);

s = meta->UpdateBoundaries(key_after_flush, value, ikey.sequence,
ikey.type);
s = meta->UpdateBoundaries(key_after_flush, value_after_flush,
ikey.sequence, ikey.type);
if (!s.ok()) {
break;
}
Expand Down
7 changes: 7 additions & 0 deletions db/compaction/compaction_iteration_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ struct CompactionIterationStats {
uint64_t total_blob_bytes_read = 0;
uint64_t num_blobs_relocated = 0;
uint64_t total_blob_bytes_relocated = 0;

// TimedPut diagnostics
// Total number of kTypeValuePreferredSeqno records encountered.
uint64_t num_input_timed_put_records = 0;
// Number of kTypeValuePreferredSeqno records we ended up swapping in
// preferred seqno.
uint64_t num_timed_put_swap_preferred_seqno = 0;
};

} // namespace ROCKSDB_NAMESPACE
55 changes: 52 additions & 3 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ void CompactionIterator::NextFromInput() {
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) {
iter_stats_.num_input_deletion_records++;
} else if (ikey_.type == kTypeValuePreferredSeqno) {
iter_stats_.num_input_timed_put_records++;
}
iter_stats_.total_input_raw_key_bytes += key_.size();
iter_stats_.total_input_raw_value_bytes += value_.size();
Expand Down Expand Up @@ -618,7 +620,8 @@ void CompactionIterator::NextFromInput() {
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity &&
ikey_.type != kTypeValuePreferredSeqno) {
ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
ikey_.DebugString(allow_data_in_errors_, true).c_str());
assert(false);
Expand All @@ -632,7 +635,8 @@ void CompactionIterator::NextFromInput() {
assert(false);
}

if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity ||
ikey_.type == kTypeValuePreferredSeqno) {
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
Expand Down Expand Up @@ -798,7 +802,8 @@ void CompactionIterator::NextFromInput() {
// happened
if (next_ikey.type != kTypeValue &&
next_ikey.type != kTypeBlobIndex &&
next_ikey.type != kTypeWideColumnEntity) {
next_ikey.type != kTypeWideColumnEntity &&
next_ikey.type != kTypeValuePreferredSeqno) {
++iter_stats_.num_single_del_mismatch;
}

Expand Down Expand Up @@ -968,6 +973,50 @@ void CompactionIterator::NextFromInput() {
validity_info_.SetValid(ValidContext::kKeepDel);
at_next_ = true;
}
} else if (ikey_.type == kTypeValuePreferredSeqno &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
(bottommost_level_ ||
(compaction_ != nullptr &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)))) {
// This section that attempts to swap preferred sequence number will not
// be invoked if this is a CompactionIterator created for flush, since
// `compaction_` will be nullptr and it's not bottommost either.
//
// The entries with the same user key and smaller sequence numbers are
// all in this earliest snapshot range to be iterated. Since those entries
// will be hidden by this entry [rule A], it's safe to swap in the
// preferred seqno now.
//
// It's otherwise not safe to swap in the preferred seqno since it's
// possible for entries in earlier snapshots to have sequence number that
// is smaller than this entry's sequence number but bigger than this
// entry's preferred sequence number. Swapping in the preferred sequence
// number will break the internal key ordering invariant for this key.
//
// A special case involving range deletion is handled separately below.
auto [unpacked_value, preferred_seqno] =
ParsePackedValueWithSeqno(value_);
assert(preferred_seqno < ikey_.sequence);
InternalKey ikey_after_swap(ikey_.user_key, preferred_seqno, kTypeValue);
Slice ikey_after_swap_slice(*ikey_after_swap.rep());
if (range_del_agg_->ShouldDelete(
ikey_after_swap_slice,
RangeDelPositioningMode::kForwardTraversal)) {
// A range tombstone that doesn't cover this kTypeValuePreferredSeqno
// entry may end up covering the entry, so it's not safe to swap
// preferred sequence number. In this case, we output the entry as is.
validity_info_.SetValid(ValidContext::kNewUserKey);
} else {
iter_stats_.num_timed_put_swap_preferred_seqno++;
ikey_.sequence = preferred_seqno;
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
value_ = unpacked_value;
validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
}
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
status_ = Status::InvalidArgument(
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ class CompactionIterator {
kKeepDel = 9,
kNewUserKey = 10,
kRangeDeletion = 11,
kSwapPreferredSeqno = 12,
};

struct ValidityInfo {
Expand Down
Loading

0 comments on commit 1104eaa

Please sign in to comment.