diff --git a/db/builder.cc b/db/builder.cc index f9cc2a5eacd..bb0d7c41814 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -36,6 +36,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/table.h" +#include "seqno_to_time_mapping.h" #include "table/block_based/block_based_table_builder.h" #include "table/format.h" #include "table/internal_iterator.h" @@ -299,12 +300,14 @@ Status BuildTable( if (!s.ok() || empty) { builder->Abandon(); } else { - std::string seqno_to_time_mapping_str; - seqno_to_time_mapping.Encode( - seqno_to_time_mapping_str, meta->fd.smallest_seqno, - meta->fd.largest_seqno, meta->file_creation_time); + SeqnoToTimeMapping relevant_mapping; + relevant_mapping.CopyFromSeqnoRange(seqno_to_time_mapping, + meta->fd.smallest_seqno, + meta->fd.largest_seqno); + relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); + relevant_mapping.Enforce(tboptions.file_creation_time); builder->SetSeqnoTimeTableProperties( - seqno_to_time_mapping_str, + relevant_mapping, ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO ? meta->file_creation_time : meta->oldest_ancester_time); diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 9d1a45f5b72..b82f4bb6b13 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -288,39 +288,37 @@ void CompactionJob::Prepare() { if (preserve_time_duration > 0) { const ReadOptions read_options(Env::IOActivity::kCompaction); - // setup seqno_to_time_mapping_ - seqno_to_time_mapping_.SetMaxTimeDuration(preserve_time_duration); + // Setup seqno_to_time_mapping_ with relevant time range. + seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration); for (const auto& each_level : *c->inputs()) { for (const auto& fmd : each_level.files) { std::shared_ptr tp; Status s = cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr); if (s.ok()) { - seqno_to_time_mapping_.Add(tp->seqno_to_time_mapping) - .PermitUncheckedError(); - seqno_to_time_mapping_.Add(fmd->fd.smallest_seqno, - fmd->oldest_ancester_time); + s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping); + } + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "Problem reading or processing seqno-to-time mapping: %s", + s.ToString().c_str()); } } } - auto status = seqno_to_time_mapping_.Sort(); - if (!status.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Invalid sequence number to time mapping: Status: %s", - status.ToString().c_str()); - } int64_t _current_time = 0; - status = db_options_.clock->GetCurrentTime(&_current_time); - if (!status.ok()) { + Status s = db_options_.clock->GetCurrentTime(&_current_time); + if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to get current time in compaction: Status: %s", - status.ToString().c_str()); + s.ToString().c_str()); // preserve all time information preserve_time_min_seqno_ = 0; preclude_last_level_min_seqno_ = 0; + seqno_to_time_mapping_.Enforce(); } else { - seqno_to_time_mapping_.TruncateOldEntries(_current_time); + seqno_to_time_mapping_.Enforce(_current_time); uint64_t preserve_time = static_cast(_current_time) > preserve_time_duration ? _current_time - preserve_time_duration @@ -344,6 +342,16 @@ void CompactionJob::Prepare() { 1; } } + // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only + // limit the capacity after them. + // Here If we set capacity to the per-SST limit, we could be throwing away + // fidelity when a compaction output file has a narrower seqno range than + // all the inputs. If we only limit capacity for each compaction output, we + // could be doing a lot of unnecessary recomputation in a large compaction + // (up to quadratic in number of files). Thus, we do soemthing in the + // middle: enforce a resonably large constant size limit substantially + // larger than kMaxSeqnoTimePairsPerSST. + seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries); } } diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index 9ad2b3a0d5c..06b9a19ac2d 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -25,11 +25,11 @@ Status CompactionOutputs::Finish( assert(meta != nullptr); Status s = intput_status; if (s.ok()) { - std::string seqno_to_time_mapping_str; - seqno_to_time_mapping.Encode( - seqno_to_time_mapping_str, meta->fd.smallest_seqno, - meta->fd.largest_seqno, meta->file_creation_time); - builder_->SetSeqnoTimeTableProperties(seqno_to_time_mapping_str, + SeqnoToTimeMapping relevant_mapping; + relevant_mapping.CopyFromSeqnoRange( + seqno_to_time_mapping, meta->fd.smallest_seqno, meta->fd.largest_seqno); + relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); + builder_->SetSeqnoTimeTableProperties(relevant_mapping, meta->oldest_ancester_time); s = builder_->Finish(); diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 779b980d825..15bc75b94cd 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -1575,9 +1575,8 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) { ASSERT_EQ(tables_props.size(), 1); ASSERT_FALSE(tables_props.begin()->second->seqno_to_time_mapping.empty()); SeqnoToTimeMapping tp_mapping; - ASSERT_OK( - tp_mapping.Add(tables_props.begin()->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom( + tables_props.begin()->second->seqno_to_time_mapping)); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_FALSE(seqs.empty()); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1cd64eb1ad1..7a831551730 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -834,9 +834,15 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options, } } if (min_preserve_seconds == std::numeric_limits::max()) { - seqno_to_time_mapping_.Resize(0, 0); + // Don't track + seqno_to_time_mapping_.SetCapacity(0); + seqno_to_time_mapping_.SetMaxTimeSpan(UINT64_MAX); } else { - seqno_to_time_mapping_.Resize(min_preserve_seconds, max_preserve_seconds); + uint64_t cap = std::min(kMaxSeqnoToTimeEntries, + max_preserve_seconds * kMaxSeqnoTimePairsPerCF / + min_preserve_seconds); + seqno_to_time_mapping_.SetCapacity(cap); + seqno_to_time_mapping_.SetMaxTimeSpan(max_preserve_seconds); } mapping_was_empty = seqno_to_time_mapping_.Empty(); } @@ -845,9 +851,8 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options, if (min_preserve_seconds != std::numeric_limits::max()) { // round up to 1 when the time_duration is smaller than // kMaxSeqnoTimePairsPerCF - seqno_time_cadence = (min_preserve_seconds + - SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF - 1) / - SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF; + seqno_time_cadence = (min_preserve_seconds + kMaxSeqnoTimePairsPerCF - 1) / + kMaxSeqnoTimePairsPerCF; } TEST_SYNC_POINT_CALLBACK( @@ -884,7 +889,7 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options, assert(mapping_was_empty); // We can simply modify these, before writes are allowed - constexpr uint64_t kMax = SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST; + constexpr uint64_t kMax = kMaxSeqnoTimePairsPerSST; versions_->SetLastAllocatedSequence(kMax); versions_->SetLastPublishedSequence(kMax); versions_->SetLastSequence(kMax); @@ -6639,28 +6644,24 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) { immutable_db_options_.clock->GetCurrentTime(&unix_time_signed) .PermitUncheckedError(); // Ignore error uint64_t unix_time = static_cast(unix_time_signed); - bool appended = false; - { - InstrumentedMutexLock l(&mutex_); - if (populate_historical_seconds > 0) { + if (populate_historical_seconds > 0) { + bool success = true; + { + InstrumentedMutexLock l(&mutex_); if (seqno > 1 && unix_time > populate_historical_seconds) { // seqno=0 is reserved SequenceNumber from_seqno = 1; - appended = seqno_to_time_mapping_.PrePopulate( + success = seqno_to_time_mapping_.PrePopulate( from_seqno, seqno, unix_time - populate_historical_seconds, unix_time); } else { // One of these will fail assert(seqno > 1); assert(unix_time > populate_historical_seconds); + success = false; } - } else { - // FIXME: assert(seqno > 0); - appended = seqno_to_time_mapping_.Append(seqno, unix_time); } - } - if (populate_historical_seconds > 0) { - if (appended) { + if (success) { ROCKS_LOG_INFO( immutable_db_options_.info_log, "Pre-populated sequence number to time entries: [1,%" PRIu64 @@ -6673,11 +6674,11 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) { "] -> [%" PRIu64 ",%" PRIu64 "]", seqno, unix_time - populate_historical_seconds, unix_time); } - } else if (!appended) { - ROCKS_LOG_WARN(immutable_db_options_.info_log, - "Failed to insert sequence number to time entry: %" PRIu64 - " -> %" PRIu64, - seqno, unix_time); + } else { + InstrumentedMutexLock l(&mutex_); + // FIXME: assert(seqno > 0); + // Always successful assuming seqno never go backwards + seqno_to_time_mapping_.Append(seqno, unix_time); } } diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 65f6a5a4861..da1ed8ea3a2 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -145,7 +145,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( jwriter << "N/A"; } else { SeqnoToTimeMapping tmp; - Status status = tmp.Add(table_properties.seqno_to_time_mapping); + Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping); if (status.ok()) { jwriter << tmp.ToHumanString(); } else { diff --git a/db/flush_job.cc b/db/flush_job.cc index 9ab14641735..6bf3ddba1bf 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -852,10 +852,9 @@ Status FlushJob::WriteLevel0Table() { SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber(); if (!db_impl_seqno_to_time_mapping_.Empty()) { - // make a local copy, as the seqno_to_time_mapping from db_impl is not - // thread safe, which will be used while not holding the db_mutex. - seqno_to_time_mapping_ = - db_impl_seqno_to_time_mapping_.Copy(smallest_seqno); + // make a local copy to use while not holding the db_mutex. + seqno_to_time_mapping_.CopyFromSeqnoRange(db_impl_seqno_to_time_mapping_, + smallest_seqno); } std::vector blob_file_additions; diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index 199c59c9bbb..45796a2d578 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -330,25 +330,41 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { ASSERT_EQ(tables_props.size(), 1); auto it = tables_props.begin(); SeqnoToTimeMapping tp_mapping; - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); // about ~20 seqs->time entries, because the sample rate is 10000/100, and it // passes 2k time. Add (roughly) one for starting entry. - ASSERT_GE(seqs.size(), 20); - ASSERT_LE(seqs.size(), 22); - SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber() + 1; - for (auto i = start_seq; i < seq_end; i++) { - // The result is within the range - ASSERT_GE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 10 - 100); - ASSERT_LE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 10); - } + // Revised: with automatic pre-population of mappings, some of these entries + // might be purged to keep the DB mapping within capacity. + EXPECT_GE(seqs.size(), 20 / 2); + EXPECT_LE(seqs.size(), 22); + + auto ValidateProximalSeqnos = [&](const char* name, double fuzz_ratio) { + SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber() + 1; + uint64_t end_time = mock_clock_->NowSeconds(); + uint64_t seqno_fuzz = + static_cast((seq_end - start_seq) * fuzz_ratio + 0.999999); + for (unsigned time_pct = 0; time_pct <= 100; time_pct++) { + SCOPED_TRACE("name=" + std::string(name) + + " time_pct=" + std::to_string(time_pct)); + // Validate the important proximal API (GetProximalSeqnoBeforeTime) + uint64_t t = start_time + time_pct * (end_time - start_time) / 100; + auto seqno_reported = tp_mapping.GetProximalSeqnoBeforeTime(t); + auto seqno_expected = start_seq + time_pct * (seq_end - start_seq) / 100; + EXPECT_LE(seqno_reported, seqno_expected); + if (end_time - t < 10000) { + EXPECT_LE(seqno_expected, seqno_reported + seqno_fuzz); + } + } + start_seq = seq_end; + start_time = end_time; + }; + + ValidateProximalSeqnos("a", 0.1); + checked_file_nums.insert(it->second->orig_file_number); - start_seq = seq_end; - start_time = mock_clock_->NowSeconds(); // Write a key every 1 seconds for (int i = 0; i < 200; i++) { @@ -356,7 +372,7 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { dbfull()->TEST_WaitForPeriodicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(1)); }); } - seq_end = dbfull()->GetLatestSequenceNumber() + 1; + ASSERT_OK(Flush()); tables_props.clear(); ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); @@ -371,21 +387,17 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { ASSERT_TRUE(it != tables_props.end()); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); // There only a few time sample ASSERT_GE(seqs.size(), 1); ASSERT_LE(seqs.size(), 3); - for (auto i = start_seq; i < seq_end; i++) { - ASSERT_GE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) - 100); - ASSERT_LE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq)); - } + + // High fuzz ratio because of low number of samples + ValidateProximalSeqnos("b", 0.5); + checked_file_nums.insert(it->second->orig_file_number); - start_seq = seq_end; - start_time = mock_clock_->NowSeconds(); // Write a key every 200 seconds for (int i = 0; i < 200; i++) { @@ -393,7 +405,7 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { dbfull()->TEST_WaitForPeriodicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); } - seq_end = dbfull()->GetLatestSequenceNumber() + 1; + // seq_end = dbfull()->GetLatestSequenceNumber() + 1; ASSERT_OK(Flush()); tables_props.clear(); ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); @@ -408,24 +420,16 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { ASSERT_TRUE(it != tables_props.end()); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); - // The sequence number -> time entries should be maxed - ASSERT_GE(seqs.size(), 99); - ASSERT_LE(seqs.size(), 101); - for (auto i = start_seq; i < seq_end; i++) { - // aged out entries allowed to report time=0 - if ((seq_end - i) * 200 <= 10000) { - ASSERT_GE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 200 - 100); - } - ASSERT_LE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 200); - } + // For the preserved time span, only 10000/200=50 (+1) entries were recorded + ASSERT_GE(seqs.size(), 50); + ASSERT_LE(seqs.size(), 51); + + ValidateProximalSeqnos("c", 0.04); + checked_file_nums.insert(it->second->orig_file_number); - start_seq = seq_end; - start_time = mock_clock_->NowSeconds(); // Write a key every 100 seconds for (int i = 0; i < 200; i++) { @@ -433,7 +437,6 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { dbfull()->TEST_WaitForPeriodicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); } - seq_end = dbfull()->GetLatestSequenceNumber() + 1; ASSERT_OK(Flush()); tables_props.clear(); ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); @@ -447,9 +450,11 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { } ASSERT_TRUE(it != tables_props.end()); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); + // For the preserved time span, max entries were recorded and + // preserved (10000/100=100 (+1)) ASSERT_GE(seqs.size(), 99); ASSERT_LE(seqs.size(), 101); @@ -474,21 +479,14 @@ TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { } ASSERT_TRUE(it != tables_props.end()); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 99); ASSERT_LE(seqs.size(), 101); - for (auto i = start_seq; i < seq_end; i++) { - // aged out entries allowed to report time=0 - // FIXME: should be <= - if ((seq_end - i) * 100 < 10000) { - ASSERT_GE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 100 - 100); - } - ASSERT_LE(tp_mapping.GetProximalTimeBeforeSeqno(i), - start_time + (i - start_seq) * 100); - } + + ValidateProximalSeqnos("d", 0.02); + ASSERT_OK(db_->Close()); } @@ -545,8 +543,8 @@ TEST_P(SeqnoTimeTablePropTest, MultiCFs) { ASSERT_EQ(tables_props.size(), 1); it = tables_props.begin(); SeqnoToTimeMapping tp_mapping; - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 1); @@ -565,7 +563,8 @@ TEST_P(SeqnoTimeTablePropTest, MultiCFs) { } seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 1000 - 1); - ASSERT_LE(seqs.size(), 1000 + 1); + // Non-strict limit can exceed capacity by a reasonable fraction + ASSERT_LE(seqs.size(), 1000 * 9 / 8); ASSERT_OK(Flush(2)); tables_props.clear(); @@ -573,8 +572,8 @@ TEST_P(SeqnoTimeTablePropTest, MultiCFs) { ASSERT_EQ(tables_props.size(), 1); it = tables_props.begin(); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); // the max encoded entries is 100 ASSERT_GE(seqs.size(), 100 - 1); @@ -606,8 +605,8 @@ TEST_P(SeqnoTimeTablePropTest, MultiCFs) { ASSERT_EQ(tables_props.size(), 1); it = tables_props.begin(); tp_mapping.Clear(); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 99); ASSERT_LE(seqs.size(), 101); @@ -721,8 +720,8 @@ TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { for (const auto& props : tables_props) { ASSERT_FALSE(props.second->seqno_to_time_mapping.empty()); SeqnoToTimeMapping tp_mapping; - ASSERT_OK(tp_mapping.Add(props.second->seqno_to_time_mapping)); - ASSERT_OK(tp_mapping.Sort()); + ASSERT_OK(tp_mapping.DecodeFrom(props.second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); // Add (roughly) one for starting entry. @@ -746,7 +745,8 @@ TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { auto it = tables_props.begin(); SeqnoToTimeMapping tp_mapping; ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); // compact to the last level CompactRangeOptions cro; @@ -773,7 +773,8 @@ TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { it = tables_props.begin(); ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); - ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping)); + ASSERT_TRUE(tp_mapping.TEST_IsEnforced()); // make half of the data expired mock_clock_->MockSleepForSeconds(static_cast(8000)); @@ -929,7 +930,7 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) { DestroyAndReopen(track_options); // Ensure pre-population - constexpr auto kPrePopPairs = SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST; + constexpr auto kPrePopPairs = kMaxSeqnoTimePairsPerSST; sttm = dbfull()->TEST_GetSeqnoToTimeMapping(); latest_seqno = db_->GetLatestSequenceNumber(); start_time = mock_clock_->NowSeconds(); @@ -970,7 +971,7 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) { sttm = dbfull()->TEST_GetSeqnoToTimeMapping(); latest_seqno = db_->GetLatestSequenceNumber(); end_time = mock_clock_->NowSeconds(); - ASSERT_EQ(sttm.Size(), kPrePopPairs); + ASSERT_GE(sttm.Size(), kPrePopPairs); ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(end_time), latest_seqno); ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs / 2), kPrePopPairs / 2); @@ -1015,42 +1016,45 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) { } TEST_F(SeqnoTimeTest, MappingAppend) { - SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + using P = SeqnoToTimeMapping::SeqnoTimePair; + SeqnoToTimeMapping test; + test.SetMaxTimeSpan(100).SetCapacity(10); // ignore seqno == 0, as it may mean the seqno is zeroed out - ASSERT_FALSE(test.Append(0, 9)); + ASSERT_FALSE(test.Append(0, 100)); - ASSERT_TRUE(test.Append(3, 10)); + ASSERT_TRUE(test.Append(3, 200)); auto size = test.Size(); // normal add - ASSERT_TRUE(test.Append(10, 11)); + ASSERT_TRUE(test.Append(10, 300)); size++; ASSERT_EQ(size, test.Size()); - // Append unsorted - ASSERT_FALSE(test.Append(8, 12)); - ASSERT_EQ(size, test.Size()); - // Append with the same seqno, newer time is rejected because that makes // GetProximalSeqnoBeforeTime queries worse (see later test) - ASSERT_FALSE(test.Append(10, 12)); + ASSERT_FALSE(test.Append(10, 301)); ASSERT_EQ(size, test.Size()); - // older time will be ignored - ASSERT_FALSE(test.Append(10, 9)); + ASSERT_EQ(test.TEST_GetLastEntry(), P({10, 300})); + + // Same or new seqno with same or older time (as last successfully added) is + // accepted by replacing last entry (improves GetProximalSeqnoBeforeTime + // queries without blowing up size) + ASSERT_FALSE(test.Append(10, 299)); ASSERT_EQ(size, test.Size()); + ASSERT_EQ(test.TEST_GetLastEntry(), P({10, 299})); - // new seqno with old time will be ignored - ASSERT_FALSE(test.Append(12, 8)); + ASSERT_FALSE(test.Append(11, 299)); ASSERT_EQ(size, test.Size()); + ASSERT_EQ(test.TEST_GetLastEntry(), P({11, 299})); - // new seqno with same time is accepted by replacing last entry - // (improves GetProximalSeqnoBeforeTime queries without blowing up size) - ASSERT_TRUE(test.Append(12, 11)); + ASSERT_FALSE(test.Append(11, 250)); ASSERT_EQ(size, test.Size()); + ASSERT_EQ(test.TEST_GetLastEntry(), P({11, 250})); } TEST_F(SeqnoTimeTest, ProximalFunctions) { - SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + SeqnoToTimeMapping test; + test.SetCapacity(10); EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1), kUnknownTimeBeforeAll); EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1000000000000U), @@ -1081,6 +1085,7 @@ TEST_F(SeqnoTimeTest, ProximalFunctions) { // More samples EXPECT_TRUE(test.Append(20, 600)); EXPECT_TRUE(test.Append(30, 700)); + EXPECT_EQ(test.Size(), 3U); EXPECT_EQ(test.GetProximalTimeBeforeSeqno(10), kUnknownTimeBeforeAll); EXPECT_EQ(test.GetProximalTimeBeforeSeqno(11), 500U); @@ -1140,8 +1145,9 @@ TEST_F(SeqnoTimeTest, ProximalFunctions) { // Burst of writes during a short time creates an opportunity // for better results from GetProximalSeqnoBeforeTime(), at the - // expense of GetProximalTimeBeforeSeqno(). - EXPECT_TRUE(test.Append(50, 900)); + // expense of GetProximalTimeBeforeSeqno(). False return indicates + // merge with previous entry. + EXPECT_FALSE(test.Append(50, 900)); // These are subject to later revision depending on priorities EXPECT_EQ(test.GetProximalTimeBeforeSeqno(49), 700U); @@ -1151,7 +1157,8 @@ TEST_F(SeqnoTimeTest, ProximalFunctions) { } TEST_F(SeqnoTimeTest, PrePopulate) { - SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + SeqnoToTimeMapping test; + test.SetMaxTimeSpan(100).SetCapacity(10); EXPECT_EQ(test.Size(), 0U); @@ -1194,14 +1201,15 @@ TEST_F(SeqnoTimeTest, PrePopulate) { } } -TEST_F(SeqnoTimeTest, TruncateOldEntries) { - constexpr uint64_t kMaxTimeDuration = 42; - SeqnoToTimeMapping test(kMaxTimeDuration, /*max_capacity=*/10); +TEST_F(SeqnoTimeTest, EnforceWithNow) { + constexpr uint64_t kMaxTimeSpan = 420; + SeqnoToTimeMapping test; + test.SetMaxTimeSpan(kMaxTimeSpan).SetCapacity(10); EXPECT_EQ(test.Size(), 0U); // Safe on empty mapping - test.TruncateOldEntries(500); + test.Enforce(/*now=*/500); EXPECT_EQ(test.Size(), 0U); @@ -1223,13 +1231,13 @@ TEST_F(SeqnoTimeTest, TruncateOldEntries) { // etc. // Must keep first entry - test.TruncateOldEntries(500 + kMaxTimeDuration); + test.Enforce(/*now=*/500 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 5U); - test.TruncateOldEntries(599 + kMaxTimeDuration); + test.Enforce(/*now=*/599 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 5U); // Purges first entry - test.TruncateOldEntries(600 + kMaxTimeDuration); + test.Enforce(/*now=*/600 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 4U); EXPECT_EQ(test.GetProximalSeqnoBeforeTime(500), kUnknownSeqnoBeforeAll); @@ -1239,20 +1247,20 @@ TEST_F(SeqnoTimeTest, TruncateOldEntries) { EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U); // No effect - test.TruncateOldEntries(600 + kMaxTimeDuration); + test.Enforce(/*now=*/600 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 4U); - test.TruncateOldEntries(699 + kMaxTimeDuration); + test.Enforce(/*now=*/699 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 4U); // Purges next two - test.TruncateOldEntries(899 + kMaxTimeDuration); + test.Enforce(/*now=*/899 + kMaxTimeSpan); EXPECT_EQ(test.Size(), 2U); EXPECT_EQ(test.GetProximalSeqnoBeforeTime(799), kUnknownSeqnoBeforeAll); EXPECT_EQ(test.GetProximalSeqnoBeforeTime(899), 40U); // Always keep last entry, to have a non-trivial seqno bound - test.TruncateOldEntries(10000000); + test.Enforce(/*now=*/10000000); EXPECT_EQ(test.Size(), 1U); EXPECT_EQ(test.GetProximalSeqnoBeforeTime(10000000), 50U); @@ -1262,67 +1270,114 @@ TEST_F(SeqnoTimeTest, Sort) { SeqnoToTimeMapping test; // single entry - test.Add(10, 11); - ASSERT_OK(test.Sort()); + test.AddUnenforced(10, 11); + test.Enforce(); ASSERT_EQ(test.Size(), 1); - // duplicate, should be removed by sort - test.Add(10, 11); - // same seqno, but older time, should be removed - test.Add(10, 9); + // duplicate is ignored + test.AddUnenforced(10, 11); + test.Enforce(); + ASSERT_EQ(test.Size(), 1); - // unuseful ones, should be removed by sort - test.Add(11, 9); - test.Add(9, 8); + // add some revised mappings for that seqno + test.AddUnenforced(10, 10); + test.AddUnenforced(10, 12); - // Good ones - test.Add(1, 10); - test.Add(100, 100); + // We currently favor GetProximalSeqnoBeforeTime over + // GetProximalTimeBeforeSeqno by keeping the older time. + test.Enforce(); + auto seqs = test.TEST_GetInternalMapping(); + std::deque expected; + expected.emplace_back(10, 10); + ASSERT_EQ(expected, seqs); - ASSERT_OK(test.Sort()); + // add an inconsistent / unuseful mapping + test.AddUnenforced(9, 11); + test.Enforce(); + seqs = test.TEST_GetInternalMapping(); + ASSERT_EQ(expected, seqs); - auto seqs = test.TEST_GetInternalMapping(); + // And a mapping that is considered more useful (for + // GetProximalSeqnoBeforeTime) and thus replaces that one + test.AddUnenforced(11, 9); + test.Enforce(); + seqs = test.TEST_GetInternalMapping(); + expected.clear(); + expected.emplace_back(11, 9); + ASSERT_EQ(expected, seqs); - std::deque expected; - expected.emplace_back(1, 10); - expected.emplace_back(10, 11); + // Add more good, non-mergable entries + test.AddUnenforced(1, 5); + test.AddUnenforced(100, 100); + test.Enforce(); + seqs = test.TEST_GetInternalMapping(); + expected.clear(); + expected.emplace_back(1, 5); + expected.emplace_back(11, 9); expected.emplace_back(100, 100); - ASSERT_EQ(expected, seqs); } TEST_F(SeqnoTimeTest, EncodeDecodeBasic) { - SeqnoToTimeMapping test(0, 1000); + constexpr uint32_t kOriginalSamples = 1000; + SeqnoToTimeMapping test; + test.SetCapacity(kOriginalSamples); std::string output; - test.Encode(output, 0, 1000, 100); + test.EncodeTo(output); ASSERT_TRUE(output.empty()); - for (int i = 1; i <= 1000; i++) { - ASSERT_TRUE(test.Append(i, i * 10)); - } - test.Encode(output, 0, 1000, 100); + ASSERT_OK(test.DecodeFrom(output)); + ASSERT_EQ(test.Size(), 0U); + Random rnd(123); + for (uint32_t i = 1; i <= kOriginalSamples; i++) { + ASSERT_TRUE(test.Append(i, i * 10 + rnd.Uniform(10))); + } + output.clear(); + test.EncodeTo(output); ASSERT_FALSE(output.empty()); SeqnoToTimeMapping decoded; - ASSERT_OK(decoded.Add(output)); - ASSERT_OK(decoded.Sort()); - ASSERT_EQ(decoded.Size(), SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST); - ASSERT_EQ(test.Size(), 1000); - - for (SequenceNumber seq = 0; seq <= 1000; seq++) { - // test has the more accurate time mapping, encode only pick - // kMaxSeqnoTimePairsPerSST number of entries, which is less accurate - uint64_t target_time = test.GetProximalTimeBeforeSeqno(seq); - ASSERT_GE(decoded.GetProximalTimeBeforeSeqno(seq), - target_time < 200 ? 0 : target_time - 200); - ASSERT_LE(decoded.GetProximalTimeBeforeSeqno(seq), target_time); + ASSERT_OK(decoded.DecodeFrom(output)); + ASSERT_TRUE(decoded.TEST_IsEnforced()); + ASSERT_EQ(test.Size(), decoded.Size()); + ASSERT_EQ(test.TEST_GetInternalMapping(), decoded.TEST_GetInternalMapping()); + + // Encode a reduced set of mappings + constexpr uint32_t kReducedSize = 51U; + output.clear(); + SeqnoToTimeMapping(test).SetCapacity(kReducedSize).EncodeTo(output); + + decoded.Clear(); + ASSERT_OK(decoded.DecodeFrom(output)); + ASSERT_TRUE(decoded.TEST_IsEnforced()); + ASSERT_EQ(decoded.Size(), kReducedSize); + + for (uint64_t t = 1; t <= kOriginalSamples * 11; t += 1 + t / 100) { + SCOPED_TRACE("t=" + std::to_string(t)); + // `test` has the more accurate time mapping, but the reduced set should + // nicely span and approximate the whole range + auto orig_s = test.GetProximalSeqnoBeforeTime(t); + auto approx_s = decoded.GetProximalSeqnoBeforeTime(t); + // The oldest entry should be preserved exactly + ASSERT_EQ(orig_s == kUnknownSeqnoBeforeAll, + approx_s == kUnknownSeqnoBeforeAll); + // The newest entry should be preserved exactly + ASSERT_EQ(orig_s == kOriginalSamples, approx_s == kOriginalSamples); + + // Approximate seqno before time should err toward older seqno to avoid + // classifying data as old too early, but should be within a reasonable + // bound. + constexpr uint32_t kSeqnoFuzz = kOriginalSamples * 3 / 2 / kReducedSize; + EXPECT_GE(approx_s + kSeqnoFuzz, orig_s); + EXPECT_GE(orig_s, approx_s); } } -TEST_F(SeqnoTimeTest, EncodeDecodePerferNewTime) { - SeqnoToTimeMapping test(0, 10); +TEST_F(SeqnoTimeTest, EncodeDecodeMinimizeTimeGaps) { + SeqnoToTimeMapping test; + test.SetCapacity(10); test.Append(1, 10); test.Append(5, 17); @@ -1330,41 +1385,41 @@ TEST_F(SeqnoTimeTest, EncodeDecodePerferNewTime) { test.Append(8, 30); std::string output; - test.Encode(output, 1, 10, 0, 3); + SeqnoToTimeMapping(test).SetCapacity(3).EncodeTo(output); SeqnoToTimeMapping decoded; - ASSERT_OK(decoded.Add(output)); - ASSERT_OK(decoded.Sort()); + ASSERT_OK(decoded.DecodeFrom(output)); + ASSERT_TRUE(decoded.TEST_IsEnforced()); ASSERT_EQ(decoded.Size(), 3); auto seqs = decoded.TEST_GetInternalMapping(); std::deque expected; expected.emplace_back(1, 10); - expected.emplace_back(6, 25); + expected.emplace_back(5, 17); expected.emplace_back(8, 30); ASSERT_EQ(expected, seqs); // Add a few large time number test.Append(10, 100); test.Append(13, 200); - test.Append(16, 300); + test.Append(40, 250); + test.Append(70, 300); output.clear(); - test.Encode(output, 1, 20, 0, 4); + SeqnoToTimeMapping(test).SetCapacity(4).EncodeTo(output); decoded.Clear(); - ASSERT_OK(decoded.Add(output)); - ASSERT_OK(decoded.Sort()); + ASSERT_OK(decoded.DecodeFrom(output)); + ASSERT_TRUE(decoded.TEST_IsEnforced()); ASSERT_EQ(decoded.Size(), 4); expected.clear(); + // Except for beginning and end, entries are removed that minimize the + // remaining time gaps, regardless of seqno gaps. expected.emplace_back(1, 10); - // entry #6, #8 are skipped as they are too close to #1. - // entry #100 is also within skip range, but if it's skipped, there not enough - // number to fill 4 entries, so select it. expected.emplace_back(10, 100); expected.emplace_back(13, 200); - expected.emplace_back(16, 300); + expected.emplace_back(70, 300); seqs = decoded.TEST_GetInternalMapping(); ASSERT_EQ(expected, seqs); } diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index 97a3e987986..7fd11e90a36 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -6,6 +6,14 @@ #include "db/seqno_to_time_mapping.h" +#include +#include +#include +#include +#include +#include +#include + #include "db/version_edit.h" #include "util/string_util.h" @@ -13,25 +21,28 @@ namespace ROCKSDB_NAMESPACE { SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterTime( uint64_t time) const { + assert(enforced_); return std::upper_bound(pairs_.cbegin(), pairs_.cend(), SeqnoTimePair{0, time}, SeqnoTimePair::TimeLess); } SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterEqSeqno( SequenceNumber seqno) const { + assert(enforced_); return std::lower_bound(pairs_.cbegin(), pairs_.cend(), SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess); } SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterSeqno( SequenceNumber seqno) const { + assert(enforced_); return std::upper_bound(pairs_.cbegin(), pairs_.cend(), SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess); } uint64_t SeqnoToTimeMapping::GetProximalTimeBeforeSeqno( SequenceNumber seqno) const { - assert(is_sorted_); + assert(enforced_); // Find the last entry with a seqno strictly less than the given seqno. // First, find the first entry >= the given seqno (or end) auto it = FindGreaterEqSeqno(seqno); @@ -43,43 +54,9 @@ uint64_t SeqnoToTimeMapping::GetProximalTimeBeforeSeqno( return it->time; } -void SeqnoToTimeMapping::Add(SequenceNumber seqno, uint64_t time) { - if (seqno == 0) { - return; - } - is_sorted_ = false; - pairs_.emplace_back(seqno, time); -} - -void SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { - assert(is_sorted_); - - if (max_time_duration_ == 0) { - // No cutoff time - return; - } - - if (now < max_time_duration_) { - // Would under-flow - return; - } - - const uint64_t cut_off_time = now - max_time_duration_; - assert(cut_off_time <= now); // no under/overflow - - auto it = FindGreaterTime(cut_off_time); - if (it == pairs_.cbegin()) { - return; - } - // Move back one, to the entry that would be used to return a good seqno from - // GetProximalSeqnoBeforeTime(cut_off_time) - --it; - // Remove everything strictly before that entry - pairs_.erase(pairs_.cbegin(), std::move(it)); -} - -SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime(uint64_t time) { - assert(is_sorted_); +SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime( + uint64_t time) const { + assert(enforced_); // Find the last entry with a time <= the given time. // First, find the first entry > the given time (or end). @@ -92,130 +69,317 @@ SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime(uint64_t time) { return it->seqno; } -// The encoded format is: -// [num_of_entries][[seqno][time],[seqno][time],...] -// ^ ^ -// var_int delta_encoded (var_int) -void SeqnoToTimeMapping::Encode(std::string& dest, const SequenceNumber start, - const SequenceNumber end, const uint64_t now, - const uint64_t output_size) const { - assert(is_sorted_); - if (start > end) { - // It could happen when the SST file is empty, the initial value of min - // sequence number is kMaxSequenceNumber and max is 0. - // The empty output file will be removed in the final step of compaction. - return; +void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) { + assert(enforced_); // at least sorted + uint64_t cutoff_time; + if (now > 0) { + if (pairs_.size() <= 1) { + return; + } + if (now < max_time_span_) { + // Nothing eligible to prune / avoid underflow + return; + } + cutoff_time = now - max_time_span_; + } else { + if (pairs_.size() <= 2) { + // Need to keep at least two if we don't know the current time + return; + } + const auto& last = pairs_.back(); + if (last.time < max_time_span_) { + // Nothing eligible to prune / avoid underflow + return; + } + cutoff_time = last.time - max_time_span_; } - - auto start_it = FindGreaterSeqno(start); - if (start_it != pairs_.begin()) { - start_it--; + // Keep one entry <= cutoff_time + while (pairs_.size() >= 2 && pairs_[0].time <= cutoff_time && + pairs_[1].time <= cutoff_time) { + pairs_.pop_front(); } +} - auto end_it = FindGreaterSeqno(end); - if (end_it == pairs_.begin()) { +void SeqnoToTimeMapping::EnforceCapacity(bool strict) { + assert(enforced_); // at least sorted + uint64_t strict_cap = capacity_; + if (strict_cap == 0) { + pairs_.clear(); return; } - if (start_it >= end_it) { + // Treat cap of 1 as 2 to work with the below algorithm (etc.) + // TODO: unit test + if (strict_cap == 1) { + strict_cap = 2; + } + // When !strict, allow being over nominal capacity by a modest fraction. + uint64_t effective_cap = strict_cap + (strict ? 0 : strict_cap / 8); + if (effective_cap < strict_cap) { + // Correct overflow + effective_cap = UINT64_MAX; + } + if (pairs_.size() <= effective_cap) { return; } + // The below algorithm expects at least one removal candidate between first + // and last. + assert(pairs_.size() >= 3); + size_t to_remove_count = pairs_.size() - strict_cap; + + struct RemovalCandidate { + uint64_t new_time_gap; + std::deque::iterator it; + RemovalCandidate(uint64_t _new_time_gap, + std::deque::iterator _it) + : new_time_gap(_new_time_gap), it(_it) {} + bool operator>(const RemovalCandidate& other) const { + if (new_time_gap == other.new_time_gap) { + // If same gap, treat the newer entry as less attractive + // for removal (like larger gap) + return it->seqno > other.it->seqno; + } + return new_time_gap > other.new_time_gap; + } + }; + + // A priority queue of best removal candidates (smallest time gap remaining + // after removal) + using RC = RemovalCandidate; + using PQ = std::priority_queue, std::greater>; + PQ pq; + + // Add all the candidates (not including first and last) + { + auto it = pairs_.begin(); + assert(it->time != kUnknownTimeBeforeAll); + uint64_t prev_prev_time = it->time; + ++it; + assert(it->time != kUnknownTimeBeforeAll); + auto prev_it = it; + ++it; + while (it != pairs_.end()) { + assert(it->time != kUnknownTimeBeforeAll); + uint64_t gap = it->time - prev_prev_time; + pq.emplace(gap, prev_it); + prev_prev_time = prev_it->time; + prev_it = it; + ++it; + } + } - // truncate old entries that are not needed - if (max_time_duration_ > 0) { - const uint64_t cut_off_time = - now > max_time_duration_ ? now - max_time_duration_ : 0; - while (start_it < end_it && start_it->time < cut_off_time) { - start_it++; + // Greedily remove the best candidate, iteratively + while (to_remove_count > 0) { + assert(!pq.empty()); + // Remove the candidate with smallest gap + auto rc = pq.top(); + pq.pop(); + + // NOTE: priority_queue does not support updating an existing element, + // but we can work around that because the gap tracked in pq is only + // going to be better than actuality, and we can detect and adjust + // when a better-than-actual gap is found. + + // Determine actual time gap if this entry is removed (zero entries are + // marked for deletion) + auto it = rc.it + 1; + uint64_t after_time = it->time; + while (after_time == kUnknownTimeBeforeAll) { + assert(it != pairs_.end()); + ++it; + after_time = it->time; + } + it = rc.it - 1; + uint64_t before_time = it->time; + while (before_time == kUnknownTimeBeforeAll) { + assert(it != pairs_.begin()); + --it; + before_time = it->time; + } + // Check whether the gap is still valid (or needs to be recomputed) + if (rc.new_time_gap == after_time - before_time) { + // Mark the entry as removed + rc.it->time = kUnknownTimeBeforeAll; + --to_remove_count; + } else { + // Insert a replacement up-to-date removal candidate + pq.emplace(after_time - before_time, rc.it); } } - // to include the first element - if (start_it != pairs_.begin()) { - start_it--; - } - - // If there are more data than needed, pick the entries for encoding. - // It's not the most optimized algorithm for selecting the best representative - // entries over the time. - // It starts from the beginning and makes sure the distance is larger than - // `(end - start) / size` before selecting the number. For example, for the - // following list, pick 3 entries (it will pick seqno #1, #6, #8): - // 1 -> 10 - // 5 -> 17 - // 6 -> 25 - // 8 -> 30 - // first, it always picks the first one, then there are 2 num_entries_to_fill - // and the time difference between current one vs. the last one is - // (30 - 10) = 20. 20/2 = 10. So it will skip until 10+10 = 20. => it skips - // #5 and pick #6. - // But the most optimized solution is picking #1 #5 #8, as it will be more - // evenly distributed for time. Anyway the following algorithm is simple and - // may over-select new data, which is good. We do want more accurate time - // information for recent data. - std::deque output_copy; - if (std::distance(start_it, end_it) > static_cast(output_size)) { - int64_t num_entries_to_fill = static_cast(output_size); - auto last_it = end_it; - last_it--; - uint64_t end_time = last_it->time; - uint64_t skip_until_time = 0; - for (auto it = start_it; it < end_it; it++) { - // skip if it's not reach the skip_until_time yet - if (std::distance(it, end_it) > num_entries_to_fill && - it->time < skip_until_time) { - continue; + + // Collapse away entries marked for deletion + auto from_it = pairs_.begin(); + auto to_it = from_it; + + for (; from_it != pairs_.end(); ++from_it) { + if (from_it->time != kUnknownTimeBeforeAll) { + if (from_it != to_it) { + *to_it = *from_it; } - output_copy.push_back(*it); - num_entries_to_fill--; - if (std::distance(it, end_it) > num_entries_to_fill && - num_entries_to_fill > 0) { - // If there are more entries than we need, re-calculate the - // skip_until_time, which means skip until that time - skip_until_time = - it->time + ((end_time - it->time) / num_entries_to_fill); + ++to_it; + } + } + + // Erase slots freed up + pairs_.erase(to_it, pairs_.end()); + assert(pairs_.size() == strict_cap); +} + +bool SeqnoToTimeMapping::SeqnoTimePair::Merge(const SeqnoTimePair& other) { + assert(seqno <= other.seqno); + if (seqno == other.seqno) { + // Favoring GetProximalSeqnoBeforeTime over GetProximalTimeBeforeSeqno + // by keeping the older time. For example, consider nothing has been + // written to the DB in some time. + time = std::min(time, other.time); + return true; + } else if (time == other.time) { + // Favoring GetProximalSeqnoBeforeTime over GetProximalTimeBeforeSeqno + // by keeping the newer seqno. For example, when a burst of writes ages + // out, we want the cutoff to be the newest seqno from that burst. + seqno = std::max(seqno, other.seqno); + return true; + } else if (time > other.time) { + assert(seqno < other.seqno); + // Need to resolve an inconsistency (clock drift? very rough time?). + // Given the direction that entries are supposed to err, trust the earlier + // time entry as more reliable, and this choice ensures we don't + // accidentally throw out an entry within our time span. + *this = other; + return true; + } else { + // Not merged + return false; + } +} + +void SeqnoToTimeMapping::SortAndMerge() { + assert(!enforced_); + if (!pairs_.empty()) { + std::sort(pairs_.begin(), pairs_.end()); + + auto from_it = pairs_.begin(); + auto to_it = from_it; + for (++from_it; from_it != pairs_.end(); ++from_it) { + if (to_it->Merge(*from_it)) { + // Merged with last entry + } else { + // Copy into next entry + *++to_it = *from_it; } } + // Erase slots freed up from merging + pairs_.erase(to_it + 1, pairs_.end()); + } + // Mark as "at least sorted" + enforced_ = true; +} + +SeqnoToTimeMapping& SeqnoToTimeMapping::SetMaxTimeSpan(uint64_t max_time_span) { + max_time_span_ = max_time_span; + if (enforced_) { + EnforceMaxTimeSpan(); + } + return *this; +} + +SeqnoToTimeMapping& SeqnoToTimeMapping::SetCapacity(uint64_t capacity) { + capacity_ = capacity; + if (enforced_) { + EnforceCapacity(/*strict=*/true); + } + return *this; +} - // Make sure all entries are filled - assert(num_entries_to_fill == 0); - start_it = output_copy.begin(); - end_it = output_copy.end(); +SeqnoToTimeMapping& SeqnoToTimeMapping::Enforce(uint64_t now) { + if (!enforced_) { + SortAndMerge(); + assert(enforced_); + EnforceMaxTimeSpan(now); + } else if (now > 0) { + EnforceMaxTimeSpan(now); } + EnforceCapacity(/*strict=*/true); + return *this; +} - // Delta encode the data - uint64_t size = std::distance(start_it, end_it); - PutVarint64(&dest, size); +void SeqnoToTimeMapping::AddUnenforced(SequenceNumber seqno, uint64_t time) { + if (seqno == 0) { + return; + } + enforced_ = false; + pairs_.emplace_back(seqno, time); +} + +// The encoded format is: +// [num_of_entries][[seqno][time],[seqno][time],...] +// ^ ^ +// var_int delta_encoded (var_int) +// Except empty string is used for empty mapping. This means the encoding +// doesn't fully form a prefix code, but that is OK for applications like +// TableProperties. +void SeqnoToTimeMapping::EncodeTo(std::string& dest) const { + assert(enforced_); + // Can use empty string for empty mapping + if (pairs_.empty()) { + return; + } + // Encode number of entries + PutVarint64(&dest, pairs_.size()); SeqnoTimePair base; - for (auto it = start_it; it < end_it; it++) { - assert(base < *it); - SeqnoTimePair val = it->ComputeDelta(base); - base = *it; + for (auto& cur : pairs_) { + assert(base < cur); + // Delta encode each entry + SeqnoTimePair val = cur.ComputeDelta(base); + base = cur; val.Encode(dest); } } -Status SeqnoToTimeMapping::Add(const std::string& pairs_str) { - Slice input(pairs_str); +namespace { +Status DecodeImpl(Slice& input, + std::deque& pairs) { if (input.empty()) { return Status::OK(); } - uint64_t size; - if (!GetVarint64(&input, &size)) { + uint64_t count; + if (!GetVarint64(&input, &count)) { return Status::Corruption("Invalid sequence number time size"); } - is_sorted_ = false; - SeqnoTimePair base; - for (uint64_t i = 0; i < size; i++) { - SeqnoTimePair val; + + SeqnoToTimeMapping::SeqnoTimePair base; + for (uint64_t i = 0; i < count; i++) { + SeqnoToTimeMapping::SeqnoTimePair val; Status s = val.Decode(input); if (!s.ok()) { return s; } val.ApplyDelta(base); - pairs_.emplace_back(val); + pairs.emplace_back(val); base = val; } + + if (!input.empty()) { + return Status::Corruption( + "Extra bytes at end of sequence number time mapping"); + } return Status::OK(); } +} // namespace + +Status SeqnoToTimeMapping::DecodeFrom(const std::string& pairs_str) { + size_t orig_size = pairs_.size(); + + Slice input(pairs_str); + Status s = DecodeImpl(input, pairs_); + if (!s.ok()) { + // Roll back in case of corrupted data + pairs_.resize(orig_size); + } else if (orig_size > 0 || max_time_span_ < UINT64_MAX || + capacity_ < UINT64_MAX) { + enforced_ = false; + } + return s; +} void SeqnoToTimeMapping::SeqnoTimePair::Encode(std::string& dest) const { PutVarint64Varint64(&dest, seqno, time); @@ -231,38 +395,69 @@ Status SeqnoToTimeMapping::SeqnoTimePair::Decode(Slice& input) { return Status::OK(); } -bool SeqnoToTimeMapping::Append(SequenceNumber seqno, uint64_t time) { - assert(is_sorted_); +void SeqnoToTimeMapping::CopyFromSeqnoRange(const SeqnoToTimeMapping& src, + SequenceNumber from_seqno, + SequenceNumber to_seqno) { + bool orig_empty = Empty(); + auto src_it = src.FindGreaterEqSeqno(from_seqno); + // To best answer GetProximalTimeBeforeSeqno(from_seqno) we need an entry + // with a seqno before that (if available) + if (src_it != src.pairs_.begin()) { + --src_it; + } + auto src_it_end = src.FindGreaterSeqno(to_seqno); + std::copy(src_it, src_it_end, std::back_inserter(pairs_)); - // skip seq number 0, which may have special meaning, like zeroed out data - if (seqno == 0) { + if (!orig_empty || max_time_span_ < UINT64_MAX || capacity_ < UINT64_MAX) { + enforced_ = false; + } +} + +bool SeqnoToTimeMapping::Append(SequenceNumber seqno, uint64_t time) { + if (capacity_ == 0) { return false; } - if (!Empty()) { - if (seqno < Last().seqno || time < Last().time) { - return false; - } - if (seqno == Last().seqno) { - // Updating Last() would hurt GetProximalSeqnoBeforeTime() queries, so - // NOT doing it (for now) - return false; - } - if (time == Last().time) { - // Updating Last() here helps GetProximalSeqnoBeforeTime() queries, so - // doing it (for now) - Last().seqno = seqno; - return true; + bool added = false; + if (seqno == 0) { + // skip seq number 0, which may have special meaning, like zeroed out data + } else if (pairs_.empty()) { + enforced_ = true; + pairs_.push_back({seqno, time}); + // skip normal enforced check below + return true; + } else { + auto& last = pairs_.back(); + // We can attempt to merge with the last entry if the new entry sorts with + // it. + if (last.seqno <= seqno) { + bool merged = last.Merge({seqno, time}); + if (!merged) { + if (enforced_ && (seqno <= last.seqno || time <= last.time)) { + // Out of order append should not happen, except in case of clock + // reset + assert(false); + } else { + pairs_.push_back({seqno, time}); + added = true; + } + } + } else if (!enforced_) { + // Treat like AddUnenforced and fix up below + pairs_.push_back({seqno, time}); + added = true; + } else { + // Out of order append attempted + assert(false); } } - - pairs_.emplace_back(seqno, time); - - if (pairs_.size() > max_capacity_) { - // FIXME: be smarter about how we erase to avoid data falling off the - // front prematurely. - pairs_.pop_front(); + // Similar to Enforce() but not quite + if (!enforced_) { + SortAndMerge(); + assert(enforced_); } - return true; + EnforceMaxTimeSpan(); + EnforceCapacity(/*strict=*/false); + return added; } bool SeqnoToTimeMapping::PrePopulate(SequenceNumber from_seqno, @@ -284,64 +479,6 @@ bool SeqnoToTimeMapping::PrePopulate(SequenceNumber from_seqno, return /*success*/ true; } -bool SeqnoToTimeMapping::Resize(uint64_t min_time_duration, - uint64_t max_time_duration) { - uint64_t new_max_capacity = - CalculateMaxCapacity(min_time_duration, max_time_duration); - if (new_max_capacity == max_capacity_) { - return false; - } else if (new_max_capacity < pairs_.size()) { - uint64_t delta = pairs_.size() - new_max_capacity; - // FIXME: be smarter about how we erase to avoid data falling off the - // front prematurely. - pairs_.erase(pairs_.begin(), pairs_.begin() + delta); - } - max_capacity_ = new_max_capacity; - return true; -} - -Status SeqnoToTimeMapping::Sort() { - if (is_sorted_) { - return Status::OK(); - } - if (pairs_.empty()) { - is_sorted_ = true; - return Status::OK(); - } - - std::deque copy = std::move(pairs_); - - std::sort(copy.begin(), copy.end()); - - pairs_.clear(); - - // remove seqno = 0, which may have special meaning, like zeroed out data - while (copy.front().seqno == 0) { - copy.pop_front(); - } - - SeqnoTimePair prev = copy.front(); - for (const auto& it : copy) { - // If sequence number is the same, pick the one with larger time, which is - // more accurate than the older time. - if (it.seqno == prev.seqno) { - assert(it.time >= prev.time); - prev.time = it.time; - } else { - assert(it.seqno > prev.seqno); - // If a larger sequence number has an older time which is not useful, skip - if (it.time > prev.time) { - pairs_.push_back(prev); - prev = it; - } - } - } - pairs_.emplace_back(prev); - - is_sorted_ = true; - return Status::OK(); -} - std::string SeqnoToTimeMapping::ToHumanString() const { std::string ret; for (const auto& seq_time : pairs_) { @@ -353,25 +490,4 @@ std::string SeqnoToTimeMapping::ToHumanString() const { return ret; } -SeqnoToTimeMapping SeqnoToTimeMapping::Copy( - SequenceNumber smallest_seqno) const { - SeqnoToTimeMapping ret; - auto it = FindGreaterSeqno(smallest_seqno); - if (it != pairs_.begin()) { - it--; - } - std::copy(it, pairs_.end(), std::back_inserter(ret.pairs_)); - return ret; -} - -uint64_t SeqnoToTimeMapping::CalculateMaxCapacity(uint64_t min_time_duration, - uint64_t max_time_duration) { - if (min_time_duration == 0) { - return 0; - } - return std::min( - kMaxSeqnoToTimeEntries, - max_time_duration * kMaxSeqnoTimePairsPerCF / min_time_duration); -} - } // namespace ROCKSDB_NAMESPACE diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index 95a4455be18..d30a991d614 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -8,11 +8,13 @@ #include #include +#include #include #include #include #include +#include "db/dbformat.h" #include "rocksdb/status.h" #include "rocksdb/types.h" @@ -21,6 +23,22 @@ namespace ROCKSDB_NAMESPACE { constexpr uint64_t kUnknownTimeBeforeAll = 0; constexpr SequenceNumber kUnknownSeqnoBeforeAll = 0; +// Maximum number of entries can be encoded into SST. The data is delta encode +// so the maximum data usage for each SST is < 0.3K +constexpr uint64_t kMaxSeqnoTimePairsPerSST = 100; + +// Maximum number of entries per CF. If there's only CF with this feature on, +// the max span divided by this number, so for example, if +// preclude_last_level_data_seconds = 100000 (~1day), then it will sample the +// seqno -> time every 1000 seconds (~17minutes). Then the maximum entry it +// needs is 100. +// When there are multiple CFs having this feature on, the sampling cadence is +// determined by the smallest setting, the capacity is determined the largest +// setting, also it's caped by kMaxSeqnoTimePairsPerCF * 10. +constexpr uint64_t kMaxSeqnoTimePairsPerCF = 100; + +constexpr uint64_t kMaxSeqnoToTimeEntries = kMaxSeqnoTimePairsPerCF * 10; + // SeqnoToTimeMapping stores a sampled mapping from sequence numbers to // unix times (seconds since epoch). This information provides rough bounds // between sequence numbers and their write times, but is primarily designed @@ -39,27 +57,16 @@ constexpr SequenceNumber kUnknownSeqnoBeforeAll = 0; // 20 -> 600 // 30 -> 700 // -// In typical operation, the list is sorted, both among seqnos and among times, -// with a bounded number of entries, but some public working states violate -// these constraints. +// In typical operation, the list is in "enforced" operation to maintain +// invariants on sortedness, capacity, and time span of entries. However, some +// operations will put the object into "unenforced" mode where those invariants +// are relaxed until explicitly or implicitly re-enforced (which will sort and +// filter the data). // -// NOT thread safe - requires external synchronization. +// NOT thread safe - requires external synchronization, except a const +// object allows concurrent reads. class SeqnoToTimeMapping { public: - // Maximum number of entries can be encoded into SST. The data is delta encode - // so the maximum data usage for each SST is < 0.3K - static constexpr uint64_t kMaxSeqnoTimePairsPerSST = 100; - - // Maximum number of entries per CF. If there's only CF with this feature on, - // the max duration divided by this number, so for example, if - // preclude_last_level_data_seconds = 100000 (~1day), then it will sample the - // seqno -> time every 1000 seconds (~17minutes). Then the maximum entry it - // needs is 100. - // When there are multiple CFs having this feature on, the sampling cadence is - // determined by the smallest setting, the capacity is determined the largest - // setting, also it's caped by kMaxSeqnoTimePairsPerCF * 10. - static constexpr uint64_t kMaxSeqnoTimePairsPerCF = 100; - // A simple struct for sequence number to time pair struct SeqnoTimePair { SequenceNumber seqno = 0; @@ -86,6 +93,12 @@ class SeqnoToTimeMapping { time += delta_or_base.time; } + // If another pair can be combined into this one (for optimizing + // normal SeqnoToTimeMapping behavior), then this mapping is modified + // and true is returned, indicating the other mapping can be discarded. + // Otherwise false is returned and nothing is changed. + bool Merge(const SeqnoTimePair& other); + // Ordering used for Sort() bool operator<(const SeqnoTimePair& other) const { return std::tie(seqno, time) < std::tie(other.seqno, other.time); @@ -104,27 +117,77 @@ class SeqnoToTimeMapping { } }; - // constractor of SeqnoToTimeMapping - // max_time_duration is the maximum time it should track. For example, if - // preclude_last_level_data_seconds is 1 day, then if an entry is older than 1 - // day, then it can be removed. - // max_capacity is the maximum number of entry it can hold. For single CF, - // it's caped at 100 (kMaxSeqnoTimePairsPerCF), otherwise - // kMaxSeqnoTimePairsPerCF * 10. - // If it's set to 0, means it won't truncate any old data. - explicit SeqnoToTimeMapping(uint64_t max_time_duration = 0, - uint64_t max_capacity = 0) - : max_time_duration_(max_time_duration), max_capacity_(max_capacity) {} - - // Both seqno range and time range are inclusive. ... TODO - // + // Construct an empty SeqnoToTimeMapping with no limits. + SeqnoToTimeMapping() {} + + // ==== Configuration for enforced state ==== // + // Set a time span beyond which old entries can be deleted. Specifically, + // under enforcement mode, the structure will maintian only one entry older + // than the newest entry time minus max_time_span, so that + // GetProximalSeqnoBeforeTime queries back to that time return a good result. + // UINT64_MAX == unlimited. Returns *this. + SeqnoToTimeMapping& SetMaxTimeSpan(uint64_t max_time_span); + + // Set the nominal capacity under enforcement mode. The structure is allowed + // to grow some reasonable fraction larger but will automatically compact + // down to this size. UINT64_MAX == unlimited. Returns *this. + SeqnoToTimeMapping& SetCapacity(uint64_t capacity); + + // ==== Modifiers, enforced ==== // + // Adds a series of mappings interpolating from from_seqno->from_time to + // to_seqno->to_time. This can only be called on an empty object and both + // seqno range and time range are inclusive. bool PrePopulate(SequenceNumber from_seqno, SequenceNumber to_seqno, uint64_t from_time, uint64_t to_time); - // Append a new entry to the list. The new entry should be newer than the - // existing ones. It maintains the internal sorted status. + // Append a new entry to the list. The `seqno` should be >= all previous + // entries. This operation maintains enforced mode invariants, and will + // automatically (re-)enter enforced mode if not already in that state. + // Returns false if the entry was merged into the most recent entry + // rather than creating a new entry. bool Append(SequenceNumber seqno, uint64_t time); + // Clear all entries and (re-)enter enforced mode if not already in that + // state. Enforced limits are unchanged. + void Clear() { + pairs_.clear(); + enforced_ = true; + } + + // Enters the "enforced" state if not already in that state, which is + // useful before copying or querying. This will + // * Sort the entries + // * Discard any obsolete entries, which is aided if the caller specifies + // the `now` time so that entries older than now minus the max time span can + // be discarded. + // * Compact the entries to the configured capacity. + // Returns *this. + SeqnoToTimeMapping& Enforce(uint64_t now = 0); + + // ==== Modifiers, unenforced ==== // + // Add a new random entry and enter "unenforced" state. Unlike Append(), it + // can be any historical data. + void AddUnenforced(SequenceNumber seqno, uint64_t time); + + // Decode and add the entries to this mapping object. Unless starting from + // an empty mapping with no configured enforcement limits, this operation + // enters the unenforced state. + Status DecodeFrom(const std::string& pairs_str); + + // Copies entries from the src mapping object to this one, limited to entries + // needed to answer GetProximalTimeBeforeSeqno() queries for the given + // *inclusive* seqno range. The source structure must be in enforced + // state as a precondition. Unless starting with this object as empty mapping + // with no configured enforcement limits, this object enters the unenforced + // state. + void CopyFromSeqnoRange(const SeqnoToTimeMapping& src, + SequenceNumber from_seqno, + SequenceNumber to_seqno = kMaxSequenceNumber); + void CopyFrom(const SeqnoToTimeMapping& src) { + CopyFromSeqnoRange(src, kUnknownSeqnoBeforeAll, kMaxSequenceNumber); + } + + // ==== Accessors ==== // // Given a sequence number, return the best (largest / newest) known time // that is no later than the write time of that given sequence number. // If no such specific time is known, returns kUnknownTimeBeforeAll. @@ -133,12 +196,10 @@ class SeqnoToTimeMapping { // GetProximalTimeBeforeSeqno(11) -> 500 // GetProximalTimeBeforeSeqno(20) -> 500 // GetProximalTimeBeforeSeqno(21) -> 600 + // Because this is a const operation depending on sortedness, the structure + // must be in enforced state as a precondition. uint64_t GetProximalTimeBeforeSeqno(SequenceNumber seqno) const; - // Remove any entries not needed for GetProximalSeqnoBeforeTime queries of - // times older than `now - max_time_duration_` - void TruncateOldEntries(uint64_t now); - // Given a time, return the best (largest) sequence number whose write time // is no later than that given time. If no such specific sequence number is // known, returns kUnknownSeqnoBeforeAll. Using the example in the class @@ -147,74 +208,46 @@ class SeqnoToTimeMapping { // GetProximalSeqnoBeforeTime(500) -> 10 // GetProximalSeqnoBeforeTime(599) -> 10 // GetProximalSeqnoBeforeTime(600) -> 20 - SequenceNumber GetProximalSeqnoBeforeTime(uint64_t time); - - // Encode to a binary string. start and end seqno are both inclusive. - void Encode(std::string& des, SequenceNumber start, SequenceNumber end, - uint64_t now, - uint64_t output_size = kMaxSeqnoTimePairsPerSST) const; + // Because this is a const operation depending on sortedness, the structure + // must be in enforced state as a precondition. + SequenceNumber GetProximalSeqnoBeforeTime(uint64_t time) const; - // Add a new random entry, unlike Append(), it can be any data, but also makes - // the list un-sorted. - void Add(SequenceNumber seqno, uint64_t time); - - // Decode and add the entries to the current obj. The list will be unsorted - Status Add(const std::string& pairs_str); + // Encode to a binary string by appending to `dest`. + // Because this is a const operation depending on sortedness, the structure + // must be in enforced state as a precondition. + void EncodeTo(std::string& dest) const; // Return the number of entries size_t Size() const { return pairs_.size(); } - // Reduce the size of internal list - bool Resize(uint64_t min_time_duration, uint64_t max_time_duration); - - // Override the max_time_duration_ - void SetMaxTimeDuration(uint64_t max_time_duration) { - max_time_duration_ = max_time_duration; - } - - uint64_t GetCapacity() const { return max_capacity_; } - - // Sort the list, which also remove the redundant entries, useless entries, - // which makes sure the seqno is sorted, but also the time - Status Sort(); - - // copy the current obj from the given smallest_seqno. - SeqnoToTimeMapping Copy(SequenceNumber smallest_seqno) const; + uint64_t GetCapacity() const { return capacity_; } // If the internal list is empty bool Empty() const { return pairs_.empty(); } - // clear all entries - void Clear() { pairs_.clear(); } - // return the string for user message // Note: Not efficient, okay for print std::string ToHumanString() const; #ifndef NDEBUG + const SeqnoTimePair& TEST_GetLastEntry() const { return pairs_.back(); } const std::deque& TEST_GetInternalMapping() const { return pairs_; } + bool TEST_IsEnforced() const { return enforced_; } #endif private: - static constexpr uint64_t kMaxSeqnoToTimeEntries = - kMaxSeqnoTimePairsPerCF * 10; - - uint64_t max_time_duration_; - uint64_t max_capacity_; + uint64_t max_time_span_ = UINT64_MAX; + uint64_t capacity_ = UINT64_MAX; std::deque pairs_; - bool is_sorted_ = true; + bool enforced_ = true; - static uint64_t CalculateMaxCapacity(uint64_t min_time_duration, - uint64_t max_time_duration); - - SeqnoTimePair& Last() { - assert(!Empty()); - return pairs_.back(); - } + void EnforceMaxTimeSpan(uint64_t now = 0); + void EnforceCapacity(bool strict); + void SortAndMerge(); using pair_const_iterator = std::deque::const_iterator; diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 6bd28804c73..d4a0fa0dd7e 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -2105,9 +2105,9 @@ const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const { } } void BlockBasedTableBuilder::SetSeqnoTimeTableProperties( - const std::string& encoded_seqno_to_time_mapping, - uint64_t oldest_ancestor_time) { - rep_->props.seqno_to_time_mapping = encoded_seqno_to_time_mapping; + const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) { + assert(rep_->props.seqno_to_time_mapping.empty()); + relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping); rep_->props.creation_time = oldest_ancestor_time; } diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 3949474c580..f3360f8bcbc 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -106,9 +106,8 @@ class BlockBasedTableBuilder : public TableBuilder { // Get file checksum function name const char* GetFileChecksumFuncName() const override; - void SetSeqnoTimeTableProperties( - const std::string& encoded_seqno_to_time_mapping, - uint64_t oldest_ancestor_time) override; + void SetSeqnoTimeTableProperties(const SeqnoToTimeMapping& relevant_mapping, + uint64_t oldest_ancestor_time) override; private: bool ok() const { return status().ok(); } diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index 32f53be49a2..3047cae60c7 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -339,10 +339,10 @@ const char* PlainTableBuilder::GetFileChecksumFuncName() const { return kUnknownFileChecksumFuncName; } } -void PlainTableBuilder::SetSeqnoTimeTableProperties(const std::string& string, - uint64_t uint_64) { +void PlainTableBuilder::SetSeqnoTimeTableProperties( + const SeqnoToTimeMapping& relevant_mapping, uint64_t uint_64) { // TODO: storing seqno to time mapping is not yet support for plain table. - TableBuilder::SetSeqnoTimeTableProperties(string, uint_64); + TableBuilder::SetSeqnoTimeTableProperties(relevant_mapping, uint_64); } } // namespace ROCKSDB_NAMESPACE diff --git a/table/plain/plain_table_builder.h b/table/plain/plain_table_builder.h index fb7ea63be50..77f3c7acb66 100644 --- a/table/plain/plain_table_builder.h +++ b/table/plain/plain_table_builder.h @@ -95,7 +95,7 @@ class PlainTableBuilder : public TableBuilder { // Get file checksum function name const char* GetFileChecksumFuncName() const override; - void SetSeqnoTimeTableProperties(const std::string& string, + void SetSeqnoTimeTableProperties(const SeqnoToTimeMapping& relevant_mapping, uint64_t uint_64) override; private: diff --git a/table/table_builder.h b/table/table_builder.h index c01d03cb2c0..1ae19980788 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -228,9 +228,10 @@ class TableBuilder { // Return file checksum function name virtual const char* GetFileChecksumFuncName() const = 0; - // Set the sequence number to time mapping + // Set the sequence number to time mapping. `relevant_mapping` must be in + // enforced state (ready to encode to string). virtual void SetSeqnoTimeTableProperties( - const std::string& /*encoded_seqno_to_time_mapping*/, + const SeqnoToTimeMapping& /*relevant_mapping*/, uint64_t /*oldest_ancestor_time*/){}; }; diff --git a/table/table_properties.cc b/table/table_properties.cc index 17a13543de8..0a899af37a6 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -163,7 +163,7 @@ std::string TableProperties::ToString(const std::string& prop_delim, kv_delim); SeqnoToTimeMapping seq_time_mapping; - s = seq_time_mapping.Add(seqno_to_time_mapping); + s = seq_time_mapping.DecodeFrom(seqno_to_time_mapping); AppendProperty(result, "Sequence number to time mapping", s.ok() ? seq_time_mapping.ToHumanString() : "N/A", prop_delim, kv_delim); diff --git a/unreleased_history/bug_fixes/seqnototimemapping.md b/unreleased_history/bug_fixes/seqnototimemapping.md new file mode 100644 index 00000000000..b1bd5cc4e96 --- /dev/null +++ b/unreleased_history/bug_fixes/seqnototimemapping.md @@ -0,0 +1 @@ +Fixed issues with experimental `preclude_last_level_data_seconds` option that could interfere with expected data tiering.