Skip to content

Commit

Permalink
refactor(meta): optimize Hummock version delta deletion (#20114)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jan 13, 2025
1 parent f21f7a0 commit 7e26b2f
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 22 deletions.
21 changes: 17 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,22 @@ pub struct MetaDeveloperConfig {
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,

#[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")]
/// Max number of SSTs fetched from meta store per SELECT, during time travel Hummock version replay.
#[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")]
pub hummock_time_travel_sst_info_fetch_batch_size: usize,

#[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
/// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing.
#[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
pub hummock_time_travel_sst_info_insert_batch_size: usize,

#[serde(default = "default::developer::hummock_time_travel_epoch_version_insert_batch_size")]
#[serde(default = "default::developer::hummock_delta_log_delete_batch_size")]
pub hummock_delta_log_delete_batch_size: usize,

#[serde(default = "default::developer::time_travel_vacuum_interval_sec")]
pub time_travel_vacuum_interval_sec: u64,

/// Max number of epoch-to-version inserted into meta store per INSERT, during time travel metadata writing.
#[serde(default = "default::developer::hummock_time_travel_epoch_version_insert_batch_size")]
pub hummock_time_travel_epoch_version_insert_batch_size: usize,
}

Expand Down Expand Up @@ -1479,7 +1485,7 @@ pub mod default {
}

pub fn vacuum_spin_interval_ms() -> u64 {
200
100
}

pub fn hummock_version_checkpoint_interval_sec() -> u64 {
Expand Down Expand Up @@ -2058,6 +2064,13 @@ pub mod default {
100
}

pub fn hummock_delta_log_delete_batch_size() -> usize {
512
}

pub fn time_travel_vacuum_interval_sec() -> u64 {
30
}
pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize {
1000
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This page is automatically generated by `./risedev generate-example-config`
| table_stat_throuput_window_seconds_for_merge | The window seconds of table throughput statistic history for merge compaction group. | 240 |
| table_stat_throuput_window_seconds_for_split | The window seconds of table throughput statistic history for split compaction group. | 60 |
| vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 |
| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 |
| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 100 |

## meta.compaction_config

Expand Down
4 changes: 3 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ gc_history_retention_time_sec = 21600
max_inflight_time_travel_query = 1000
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
vacuum_spin_interval_ms = 200
vacuum_spin_interval_ms = 100
hummock_version_checkpoint_interval_sec = 30
enable_hummock_data_archive = false
hummock_time_travel_snapshot_interval = 100
Expand Down Expand Up @@ -93,6 +93,8 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000
meta_hummock_time_travel_sst_info_insert_batch_size = 100
meta_hummock_delta_log_delete_batch_size = 512
meta_time_travel_vacuum_interval_sec = 30
meta_hummock_time_travel_epoch_version_insert_batch_size = 1000

[meta.meta_store_config]
Expand Down
8 changes: 8 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ pub fn start(
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
default_parallelism: config.meta.default_parallelism,
vacuum_interval_sec: config.meta.vacuum_interval_sec,
time_travel_vacuum_interval_sec: config
.meta
.developer
.time_travel_vacuum_interval_sec,
vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
hummock_version_checkpoint_interval_sec: config
.meta
Expand All @@ -370,6 +374,10 @@ pub fn start(
.meta
.developer
.hummock_time_travel_sst_info_insert_batch_size,
hummock_delta_log_delete_batch_size: config
.meta
.developer
.hummock_delta_log_delete_batch_size,
hummock_time_travel_epoch_version_insert_batch_size: config
.meta
.developer
Expand Down
28 changes: 15 additions & 13 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,23 @@ impl HummockManager {
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
let deltas_to_delete = versioning
let deltas_to_delete_count = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.collect_vec();
.count();
// If there is any safe point, skip this to ensure meta backup has required delta logs to
// replay version.
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete.len()));
return Ok((0, deltas_to_delete_count));
}
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
let batch = deltas_to_delete
.iter()
let batch = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.take(batch_size)
.cloned()
.collect_vec();
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
if batch.is_empty() {
return Ok((0, 0));
}
Expand All @@ -212,7 +212,7 @@ impl HummockManager {
drop(versioning_guard);
self.check_state_consistency().await;
}
Ok((batch.len(), deltas_to_delete.len() - batch.len()))
Ok((batch.len(), deltas_to_delete_count - batch.len()))
}

/// Filters by Hummock version and Writes GC history.
Expand Down Expand Up @@ -464,7 +464,7 @@ impl HummockManager {
///
/// Returns number of deleted deltas
pub async fn delete_metadata(&self) -> MetaResult<usize> {
let batch_size = 64usize;
let batch_size = self.env.opts.hummock_delta_log_delete_batch_size;
let mut total_deleted = 0;
loop {
if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 {
Expand All @@ -477,7 +477,10 @@ impl HummockManager {
break;
}
}
Ok(total_deleted)
}

pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
let current_epoch_time = Epoch::now().physical_time();
let epoch_watermark = Epoch::from_physical_time(
current_epoch_time.saturating_sub(
Expand All @@ -489,8 +492,7 @@ impl HummockManager {
)
.0;
self.truncate_time_travel_metadata(epoch_watermark).await?;

Ok(total_deleted)
Ok(())
}

/// Deletes stale SST objects from object store.
Expand Down
24 changes: 21 additions & 3 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,29 @@ impl HummockManager {
self.write_checkpoint(&versioning_guard.checkpoint).await?;
checkpoint_version
};
for version_delta in hummock_version_deltas.values() {
if version_delta.prev_id == redo_state.id {
redo_state.apply_version_delta(version_delta);
let mut applied_delta_count = 0;
let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
tracing::info!(
total_delta = hummock_version_deltas.len(),
total_to_apply,
"Start redo Hummock version."
);
for version_delta in hummock_version_deltas
.range(redo_state.id + 1..)
.map(|(_, v)| v)
{
assert_eq!(
version_delta.prev_id, redo_state.id,
"delta prev_id {}, redo state id {}",
version_delta.prev_id, redo_state.id
);
redo_state.apply_version_delta(version_delta);
applied_delta_count += 1;
if applied_delta_count % 1000 == 0 {
tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
}
}
tracing::info!("Finish redo Hummock version.");
versioning_guard.version_stats = hummock_version_stats::Entity::find()
.one(&meta_store.conn)
.await
Expand Down
30 changes: 30 additions & 0 deletions src/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub fn start_hummock_workers(
hummock_manager.clone(),
Duration::from_secs(meta_opts.vacuum_interval_sec),
),
start_vacuum_time_travel_metadata_loop(
hummock_manager.clone(),
Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
),
];
workers
}
Expand Down Expand Up @@ -87,6 +91,32 @@ pub fn start_vacuum_metadata_loop(
(join_handle, shutdown_tx)
}

pub fn start_vacuum_time_travel_metadata_loop(
hummock_manager: HummockManagerRef,
interval: Duration,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut min_trigger_interval = tokio::time::interval(interval);
min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = min_trigger_interval.tick() => {},
// Shutdown vacuum
_ = &mut shutdown_rx => {
tracing::info!("Vacuum time travel metadata loop is stopped");
return;
}
}
if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
}
}
});
(join_handle, shutdown_tx)
}

pub fn start_checkpoint_loop(
hummock_manager: HummockManagerRef,
backup_manager: BackupManagerRef,
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ pub struct MetaOpts {
/// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
/// meta node.
pub vacuum_spin_interval_ms: u64,
pub time_travel_vacuum_interval_sec: u64,
/// Interval of hummock version checkpoint.
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
pub hummock_time_travel_sst_info_insert_batch_size: usize,
pub hummock_delta_log_delete_batch_size: usize,
pub hummock_time_travel_epoch_version_insert_batch_size: usize,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
Expand Down Expand Up @@ -269,12 +271,14 @@ impl MetaOpts {
compaction_deterministic_test: false,
default_parallelism: DefaultParallelism::Full,
vacuum_interval_sec: 30,
time_travel_vacuum_interval_sec: 30,
vacuum_spin_interval_ms: 0,
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
hummock_time_travel_sst_info_insert_batch_size: 10,
hummock_delta_log_delete_batch_size: 1000,
hummock_time_travel_epoch_version_insert_batch_size: 1000,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
Expand Down

0 comments on commit 7e26b2f

Please sign in to comment.