Skip to content

Commit

Permalink
Support returning write unix time in iterator property (facebook#12428)
Browse files Browse the repository at this point in the history
Summary:
This PR adds support to return data's approximate unix write time in the iterator property API. The general implementation is:
1) If the entry comes from a SST file, the sequence number to time mapping recorded in that file's table properties will be used to deduce the entry's write time from its sequence number. If no such recording is available, `std::numeric_limits<uint64_t>::max()` is returned to indicate the write time is unknown except if the entry's sequence number is zero, in which case, 0 is returned. This also means that even if `preclude_last_level_data_seconds` and `preserve_internal_time_seconds` can be toggled off between DB reopens, as long as the SST file's table property has the mapping available, the entry's write time can be deduced and returned.

2) If the entry comes from memtable, we will use the DB's sequence number to write time mapping to do similar things. A copy of the DB's seqno to write time mapping is kept in SuperVersion to allow iterators to have lock free access. This also means a new `SuperVersion` is installed each time DB's seqno to time mapping updates, which is originally proposed by Peter in  facebook#11928 . Similarly, if the feature is not enabled, `std::numeric_limits<uint64_t>::max()` is returned to indicate the write time is unknown.

Needed follow up:
1) The write time for `kTypeValuePreferredSeqno` should be special cased, where it's already specified by the user, so we can directly return it.

2) Flush job can be updated to use DB's seqno to time mapping copy in the SuperVersion.

3) Handle the case when `TimedPut` is called with a write time that is `std::numeric_limits<uint64_t>::max()`. We can make it a regular `Put`.

Pull Request resolved: facebook#12428

Test Plan: Added unit test

Reviewed By: pdillinger

Differential Revision: D54967067

Pulled By: jowlyzhang

fbshipit-source-id: c795b1b7ec142e09e53f2ed3461cf719833cb37a
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Mar 15, 2024
1 parent 4d5ebad commit f2546b6
Show file tree
Hide file tree
Showing 29 changed files with 510 additions and 47 deletions.
21 changes: 15 additions & 6 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,16 @@ void SuperVersion::Cleanup() {
cfd->UnrefAndTryDelete();
}

void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current) {
void SuperVersion::Init(
ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current,
std::shared_ptr<const SeqnoToTimeMapping> new_seqno_to_time_mapping) {
cfd = new_cfd;
mem = new_mem;
imm = new_imm;
current = new_current;
full_history_ts_low = cfd->GetFullHistoryTsLow();
seqno_to_time_mapping = std::move(new_seqno_to_time_mapping);
cfd->Ref();
mem->Ref();
imm->Ref();
Expand Down Expand Up @@ -1196,9 +1199,10 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
ReadOptions read_opts;
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_opts, &arena));
super_version->imm->AddIterators(read_opts, &merge_iter_builder,
merge_iter_builder.AddIterator(super_version->mem->NewIterator(
read_opts, /*seqno_to_time_mapping=*/nullptr, &arena));
super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr,
&merge_iter_builder,
false /* add_range_tombstone_iter */);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

Expand Down Expand Up @@ -1336,7 +1340,12 @@ void ColumnFamilyData::InstallSuperVersion(
const MutableCFOptions& mutable_cf_options) {
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(this, mem_, imm_.current(), current_);
new_superversion->Init(this, mem_, imm_.current(), current_,
sv_context->new_seqno_to_time_mapping
? std::move(sv_context->new_seqno_to_time_mapping)
: super_version_
? super_version_->ShareSeqnoToTimeMapping()
: nullptr);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
if (old_superversion == nullptr || old_superversion->current != current() ||
Expand Down
23 changes: 21 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/cast_util.h"
#include "util/hash_containers.h"
#include "util/thread_local.h"

Expand Down Expand Up @@ -219,6 +220,9 @@ struct SuperVersion {
// enable UDT feature, this is an empty string.
std::string full_history_ts_low;

// A shared copy of the DB's seqno to time mapping.
std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping{nullptr};

// should be called outside the mutex
SuperVersion() = default;
~SuperVersion();
Expand All @@ -232,8 +236,23 @@ struct SuperVersion {
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);
void Init(
ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current,
std::shared_ptr<const SeqnoToTimeMapping> new_seqno_to_time_mapping);

// Share the ownership of the seqno to time mapping object referred to in this
// SuperVersion. To be used by the new SuperVersion to be installed after this
// one if seqno to time mapping does not change in between these two
// SuperVersions.
std::shared_ptr<const SeqnoToTimeMapping> ShareSeqnoToTimeMapping() {
return seqno_to_time_mapping;
}

// Access the seqno to time mapping object in this SuperVersion.
UnownedPtr<const SeqnoToTimeMapping> GetSeqnoToTimeMapping() const {
return seqno_to_time_mapping.get();
}

// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
Expand Down
252 changes: 250 additions & 2 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "rocksdb/listener.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/mock_time_env.h"
#include "utilities/merge_operators.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -1307,8 +1308,8 @@ TEST_F(TieredCompactionTest, CheckInternalKeyRange) {

class PrecludeLastLevelTest : public DBTestBase {
public:
PrecludeLastLevelTest()
: DBTestBase("preclude_last_level_test", /*env_do_fsync=*/false) {
PrecludeLastLevelTest(std::string test_name = "preclude_last_level_test")
: DBTestBase(test_name, /*env_do_fsync=*/false) {
mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
mock_clock_->SetCurrentTime(kMockStartTime);
mock_env_ = std::make_unique<CompositeEnvWrapper>(env_, mock_clock_);
Expand Down Expand Up @@ -2256,6 +2257,253 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
Close();
}

// Tests DBIter::GetProperty("rocksdb.iterator.write-time") return a data's
// approximate write unix time.
// Test Param:
// 1) use tailing iterator or regular iterator (when it applies)
class IteratorWriteTimeTest : public PrecludeLastLevelTest,
public testing::WithParamInterface<bool> {
public:
IteratorWriteTimeTest() : PrecludeLastLevelTest("iterator_write_time_test") {}

uint64_t VerifyKeyAndGetWriteTime(Iterator* iter,
const std::string& expected_key) {
std::string prop;
uint64_t write_time = 0;
EXPECT_TRUE(iter->Valid());
EXPECT_EQ(expected_key, iter->key());
EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop));
Slice prop_slice = prop;
EXPECT_TRUE(GetFixed64(&prop_slice, &write_time));
return write_time;
}

void VerifyKeyAndWriteTime(Iterator* iter, const std::string& expected_key,
uint64_t expected_write_time) {
std::string prop;
uint64_t write_time = 0;
EXPECT_TRUE(iter->Valid());
EXPECT_EQ(expected_key, iter->key());
EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop));
Slice prop_slice = prop;
EXPECT_TRUE(GetFixed64(&prop_slice, &write_time));
EXPECT_EQ(expected_write_time, write_time);
}
};

TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kSecondsPerRecording = 101;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
DestroyAndReopen(options);

Random rnd(301);
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}

ReadOptions ropts;
ropts.tailing = GetParam();
int i;

// Forward iteration
uint64_t start_time = 0;
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
if (start_time == 0) {
start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
}
ASSERT_OK(iter->status());
}

// Backward iteration
{
ropts.tailing = false;
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid();
iter->Prev(), i--) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
}
ASSERT_OK(iter->status());
}
Close();
}

TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kSecondsPerRecording = 101;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
DestroyAndReopen(options);

Random rnd(301);
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}

ASSERT_OK(Flush());
ReadOptions ropts;
ropts.tailing = GetParam();
std::string prop;
int i;

// Forward iteration
uint64_t start_time = 0;
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) {
if (start_time == 0) {
start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i));
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
}
ASSERT_OK(iter->status());
}

// Backward iteration
{
ropts.tailing = false;
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid();
iter->Prev(), i--) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
}
ASSERT_OK(iter->status());
}

// Reopen the DB and disable the seqno to time recording. Data retrieved from
// SST files still have write time available.
options.preserve_internal_time_seconds = 0;
DestroyAndReopen(options);

dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put("a", "val"));
ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty());

{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
// "a" is retrieved from memtable, its write time is unknown because the
// seqno to time mapping recording is not available.
VerifyKeyAndWriteTime(iter.get(), "a",
std::numeric_limits<uint64_t>::max());
for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
if (i == 0) {
VerifyKeyAndWriteTime(iter.get(), Key(i), start_time);
} else {
VerifyKeyAndWriteTime(iter.get(), Key(i),
start_time + kSecondsPerRecording * (i + 1));
}
}
ASSERT_OK(iter->status());
}

// There is no write time info for "a" after it's flushed to SST file either.
ASSERT_OK(Flush());
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
VerifyKeyAndWriteTime(iter.get(), "a",
std::numeric_limits<uint64_t>::max());
}

// Sequence number zeroed out after compacted to the last level, write time
// all becomes zero.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
iter->SeekToFirst();
for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) {
VerifyKeyAndWriteTime(iter.get(), Key(i), 0);
}
ASSERT_OK(iter->status());
}
Close();
}

TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kSecondsPerRecording = 101;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);

dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put("foo", "fv1"));

dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put("bar", "bv1"));
ASSERT_OK(Merge("foo", "bv1"));

ReadOptions ropts;
ropts.tailing = GetParam();
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
iter->SeekToFirst();
uint64_t bar_time = VerifyKeyAndGetWriteTime(iter.get(), "bar");
iter->Next();
uint64_t foo_time = VerifyKeyAndGetWriteTime(iter.get(), "foo");
// "foo" has an older write time because its base value's write time is used
ASSERT_GT(bar_time, foo_time);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}

Close();
}

INSTANTIATE_TEST_CASE_P(IteratorWriteTimeTest, IteratorWriteTimeTest,
testing::Bool());

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit f2546b6

Please sign in to comment.