Skip to content

Commit 2e69c7f

Browse files
authored
feat: enable analyze hook after DML (#18754)
* fix * update * fix * update * update * add enable_auto_analyze * enable auto analyze * fix * fix test * fix test * fix test * fix * fix * add test * add test * add test * fix review comments
1 parent a263305 commit 2e69c7f

File tree

56 files changed

+1439
-1123
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1439
-1123
lines changed

src/query/catalog/src/statistics/basic_statistics.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl BasicColumnStatistics {
105105
}
106106

107107
// Get useful statistics: min, max and ndv are all `Some(_)`.
108-
pub fn get_useful_stat(&self, num_rows: u64) -> Option<Self> {
108+
pub fn get_useful_stat(&self, num_rows: u64, stats_row_count: u64) -> Option<Self> {
109109
if self.min.is_none() || self.max.is_none() {
110110
return None;
111111
}
@@ -119,9 +119,8 @@ impl BasicColumnStatistics {
119119
self.max.clone().unwrap(),
120120
);
121121
let ndv = match ndv {
122-
Some(0) => Some(num_rows),
123122
None => Some(num_rows),
124-
_ => ndv,
123+
Some(v) => Some(Self::estimate_ndv(v, stats_row_count, num_rows)),
125124
};
126125
Some(Self {
127126
min: self.min.clone(),
@@ -131,4 +130,41 @@ impl BasicColumnStatistics {
131130
in_memory_size: self.in_memory_size,
132131
})
133132
}
133+
134+
// Inspired by duckdb (https://github.com/duckdb/duckdb/blob/main/src/storage/statistics/distinct_statistics.cpp#L55-L69)
135+
fn estimate_ndv(ndv: u64, stats_row_count: u64, num_rows: u64) -> u64 {
136+
if stats_row_count == 0 || ndv == 0 {
137+
return num_rows;
138+
}
139+
140+
if stats_row_count >= num_rows {
141+
return ndv.min(num_rows);
142+
}
143+
144+
let s = stats_row_count as f64;
145+
let n = num_rows as f64;
146+
let u = ndv.min(stats_row_count) as f64;
147+
148+
let u1 = (u / s).powi(2) * u;
149+
// Good–Turing Estimation
150+
let estimate = u + u1 / s * (n - s);
151+
152+
estimate.round().clamp(0.0, n) as u64
153+
}
154+
}
155+
156+
#[cfg(test)]
157+
mod tests {
158+
use super::BasicColumnStatistics;
159+
160+
#[test]
161+
fn test_estimate_ndv() {
162+
assert_eq!(BasicColumnStatistics::estimate_ndv(0, 1, 3), 3);
163+
assert_eq!(BasicColumnStatistics::estimate_ndv(1, 1, 3), 3);
164+
assert_eq!(BasicColumnStatistics::estimate_ndv(12, 100, 3000), 17);
165+
assert_eq!(
166+
BasicColumnStatistics::estimate_ndv(6000, 10000, 1000000),
167+
219840
168+
);
169+
}
134170
}

src/query/catalog/src/table_context.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ pub trait TableContext: Send + Sync {
187187
fn get_compaction_num_block_hint(&self, _table_name: &str) -> u64 {
188188
unimplemented!()
189189
}
190+
fn get_enable_auto_analyze(&self) -> bool {
191+
unimplemented!()
192+
}
193+
fn set_enable_auto_analyze(&self, _enable: bool) {
194+
unimplemented!()
195+
}
190196

191197
fn attach_query_str(&self, kind: QueryKind, query: String);
192198
fn attach_query_hash(&self, text_hash: String, parameterized_hash: String);

src/query/config/src/config.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3249,13 +3249,13 @@ pub struct CacheConfig {
32493249
)]
32503250
pub table_meta_statistic_count: u64,
32513251

3252-
/// Max number of cached segment statistic meta
3252+
/// Max bytes of cached segment statistic meta
32533253
#[clap(
3254-
long = "cache-segment-statistic-count",
3254+
long = "cache-segment-statistic-bytes",
32553255
value_name = "VALUE",
3256-
default_value = "0"
3256+
default_value = "1073741824"
32573257
)]
3258-
pub segment_statistics_count: u64,
3258+
pub segment_statistics_bytes: u64,
32593259

32603260
/// Enable bloom index cache. Default is enabled. Set it to false to disable all the bloom index caches
32613261
#[clap(
@@ -3719,7 +3719,7 @@ mod cache_config_converters {
37193719
block_meta_count: value.block_meta_count,
37203720
segment_block_metas_count: value.segment_block_metas_count,
37213721
table_meta_statistic_count: value.table_meta_statistic_count,
3722-
segment_statistics_count: value.segment_statistics_count,
3722+
segment_statistics_bytes: value.segment_statistics_bytes,
37233723
enable_table_index_bloom: value.enable_table_bloom_index_cache,
37243724
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
37253725
table_bloom_index_filter_count: value.table_bloom_index_filter_count,
@@ -3756,7 +3756,7 @@ mod cache_config_converters {
37563756
table_meta_snapshot_count: value.table_meta_snapshot_count,
37573757
table_meta_segment_bytes: value.table_meta_segment_bytes,
37583758
table_meta_statistic_count: value.table_meta_statistic_count,
3759-
segment_statistics_count: value.segment_statistics_count,
3759+
segment_statistics_bytes: value.segment_statistics_bytes,
37603760
block_meta_count: value.block_meta_count,
37613761
enable_table_bloom_index_cache: value.enable_table_index_bloom,
37623762
table_bloom_index_meta_count: value.table_bloom_index_meta_count,

src/query/config/src/inner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,11 @@ pub struct CacheConfig {
575575
/// Note that a segment may contain multiple block metadata entries.
576576
pub segment_block_metas_count: u64,
577577

578-
/// Max number of cached table segment
578+
/// Max number of cached table statistics
579579
pub table_meta_statistic_count: u64,
580580

581-
/// Max number of cached segment statistics
582-
pub segment_statistics_count: u64,
581+
/// Max bytes of cached segment statistics
582+
pub segment_statistics_bytes: u64,
583583

584584
/// Enable bloom index cache. Default is enabled. Set it to false to disable all the bloom index caches
585585
pub enable_table_index_bloom: bool,
@@ -749,7 +749,7 @@ impl Default for CacheConfig {
749749
block_meta_count: 0,
750750
segment_block_metas_count: 0,
751751
table_meta_statistic_count: 256,
752-
segment_statistics_count: 0,
752+
segment_statistics_bytes: 1073741824,
753753
enable_table_index_bloom: true,
754754
table_bloom_index_meta_count: 3000,
755755
table_bloom_index_filter_count: 0,

src/query/service/src/clusters/cluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ impl ClusterDiscovery {
789789
"block_meta_count": cfg.cache.block_meta_count,
790790
"segment_block_metas_count": cfg.cache.segment_block_metas_count,
791791
"table_meta_statistic_count": cfg.cache.table_meta_statistic_count,
792-
"segment_statistics_count": cfg.cache.segment_statistics_count,
792+
"segment_statistics_bytes": cfg.cache.segment_statistics_bytes,
793793
"enable_table_index_bloom": cfg.cache.enable_table_index_bloom,
794794
"table_bloom_index_meta_count": cfg.cache.table_bloom_index_meta_count,
795795
"table_bloom_index_filter_count": cfg.cache.table_bloom_index_filter_count,

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
3030
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
3131
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
3232
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
33+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE;
3334
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
3435
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
3536
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
@@ -69,6 +70,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
6970
r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS);
7071
r.insert(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP);
7172
r.insert(FUSE_OPT_KEY_ENABLE_AUTO_VACUUM);
73+
r.insert(FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE);
7274

7375
r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS);
7476
r.insert(OPT_KEY_APPROX_DISTINCT_COLUMNS);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::sync::Arc;
17+
18+
use databend_common_base::runtime::GlobalIORuntime;
19+
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::Result;
21+
use databend_common_pipeline_core::ExecutionInfo;
22+
use databend_common_pipeline_core::Pipeline;
23+
use databend_common_storages_fuse::FuseTable;
24+
use log::info;
25+
26+
use crate::pipelines::executor::ExecutorSettings;
27+
use crate::pipelines::executor::PipelineCompleteExecutor;
28+
use crate::sessions::QueryContext;
29+
30+
pub struct AnalyzeDesc {
31+
pub catalog: String,
32+
pub database: String,
33+
pub table: String,
34+
}
35+
36+
/// Hook analyze action with a on-finished callback.
37+
/// errors (if any) are ignored.
38+
pub async fn hook_analyze(ctx: Arc<QueryContext>, pipeline: &mut Pipeline, desc: AnalyzeDesc) {
39+
if pipeline.is_empty() {
40+
return;
41+
}
42+
43+
pipeline.set_on_finished(move |info: &ExecutionInfo| {
44+
if info.res.is_ok() {
45+
info!("[ANALYZE-HOOK] Pipeline execution completed successfully, starting analyze job");
46+
if !ctx.get_enable_auto_analyze() {
47+
return Ok(());
48+
}
49+
50+
match GlobalIORuntime::instance().block_on(do_analyze(ctx, desc)) {
51+
Ok(_) => {
52+
info!("[ANALYZE-HOOK] Analyze job completed successfully");
53+
}
54+
Err(e) => {
55+
info!("[ANALYZE-HOOK] Analyze job failed: {:?}", e);
56+
}
57+
}
58+
}
59+
Ok(())
60+
});
61+
}
62+
63+
/// hook the analyze action with a on-finished callback.
64+
async fn do_analyze(ctx: Arc<QueryContext>, desc: AnalyzeDesc) -> Result<()> {
65+
// evict the table from cache
66+
ctx.evict_table_from_cache(&desc.catalog, &desc.database, &desc.table)?;
67+
ctx.clear_table_meta_timestamps_cache();
68+
69+
let table = ctx
70+
.get_table(&desc.catalog, &desc.database, &desc.table)
71+
.await?;
72+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
73+
let mut pipeline = Pipeline::create();
74+
let Some(table_snapshot) = fuse_table.read_table_snapshot().await? else {
75+
return Ok(());
76+
};
77+
fuse_table.do_analyze(
78+
ctx.clone(),
79+
table_snapshot,
80+
&mut pipeline,
81+
HashMap::new(),
82+
true,
83+
)?;
84+
pipeline.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
85+
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
86+
let pipelines = vec![pipeline];
87+
let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
88+
ctx.set_executor(complete_executor.get_inner())?;
89+
complete_executor.execute()?;
90+
Ok(())
91+
}

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,6 @@ async fn compact_table(
149149
compaction_limits: CompactionLimits,
150150
lock_opt: LockTableOption,
151151
) -> Result<()> {
152-
let table = ctx
153-
.get_table(
154-
&compact_target.catalog,
155-
&compact_target.database,
156-
&compact_target.table,
157-
)
158-
.await?;
159152
let settings = ctx.get_settings();
160153

161154
// evict the table from cache
@@ -213,6 +206,13 @@ async fn compact_table(
213206
}
214207

215208
{
209+
let table = ctx
210+
.get_table(
211+
&compact_target.catalog,
212+
&compact_target.database,
213+
&compact_target.table,
214+
)
215+
.await?;
216216
// do recluster.
217217
if let Some(cluster_type) = table.cluster_type() {
218218
if cluster_type == ClusterType::Linear {

src/query/service/src/interpreters/hook/hook.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use databend_common_sql::executor::physical_plans::MutationKind;
2222
use log::info;
2323
use log::warn;
2424

25+
use crate::interpreters::hook::analyze_hook::hook_analyze;
26+
use crate::interpreters::hook::analyze_hook::AnalyzeDesc;
2527
use crate::interpreters::hook::compact_hook::hook_compact;
2628
use crate::interpreters::hook::compact_hook::CompactHookTraceCtx;
2729
use crate::interpreters::hook::compact_hook::CompactTargetTableDescription;
@@ -68,6 +70,7 @@ impl HookOperator {
6870
pub async fn execute(&self, pipeline: &mut Pipeline) {
6971
self.execute_compact(pipeline).await;
7072
self.execute_refresh(pipeline).await;
73+
self.execute_analyze(pipeline).await;
7174
}
7275

7376
/// Execute the compact hook operator.
@@ -125,4 +128,17 @@ impl HookOperator {
125128

126129
hook_refresh(self.ctx.clone(), pipeline, refresh_desc).await;
127130
}
131+
132+
/// Execute the analyze hook operator.
133+
#[fastrace::trace]
134+
#[async_backtrace::framed]
135+
pub async fn execute_analyze(&self, pipeline: &mut Pipeline) {
136+
let desc = AnalyzeDesc {
137+
catalog: self.catalog.to_owned(),
138+
database: self.database.to_owned(),
139+
table: self.table.to_owned(),
140+
};
141+
142+
hook_analyze(self.ctx.clone(), pipeline, desc).await;
143+
}
128144
}

src/query/service/src/interpreters/hook/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub(crate) mod analyze_hook;
1516
pub(crate) mod compact_hook;
1617
pub(crate) mod refresh_hook;
1718
pub(crate) mod vacuum_hook;

0 commit comments

Comments
 (0)