Skip to content

Commit 482a40e

Browse files
authored
Merge pull request #9318 from dantengsky/feat-purge-before
feat: purge at given point
2 parents 3ab2ba0 + 166fe24 commit 482a40e

File tree

17 files changed

+147
-182
lines changed

17 files changed

+147
-182
lines changed

src/query/ast/src/ast/format/ast_format.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -1377,11 +1377,9 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
13771377
let mut children = Vec::new();
13781378
self.visit_table_ref(&stmt.catalog, &stmt.database, &stmt.table);
13791379
children.push(self.children.pop().unwrap());
1380-
if let Some(action) = &stmt.action {
1381-
let action_name = format!("Action {}", action);
1382-
let action_format_ctx = AstFormatContext::new(action_name);
1383-
children.push(FormatTreeNode::new(action_format_ctx));
1384-
}
1380+
let action_name = format!("Action {}", stmt.action);
1381+
let action_format_ctx = AstFormatContext::new(action_name);
1382+
children.push(FormatTreeNode::new(action_format_ctx));
13851383

13861384
let name = "OptimizeTable".to_string();
13871385
let format_ctx = AstFormatContext::with_children(name, children.len());

src/query/ast/src/ast/statements/table.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ impl Display for RenameTableStmt<'_> {
364364
}
365365
}
366366

367-
#[derive(Debug, Clone, PartialEq, Eq)]
367+
#[derive(Debug, Clone, PartialEq)]
368368
pub struct TruncateTableStmt<'a> {
369369
pub catalog: Option<Identifier<'a>>,
370370
pub database: Option<Identifier<'a>>,
@@ -395,7 +395,7 @@ pub struct OptimizeTableStmt<'a> {
395395
pub catalog: Option<Identifier<'a>>,
396396
pub database: Option<Identifier<'a>>,
397397
pub table: Identifier<'a>,
398-
pub action: Option<OptimizeTableAction<'a>>,
398+
pub action: OptimizeTableAction<'a>,
399399
}
400400

401401
impl Display for OptimizeTableStmt<'_> {
@@ -408,9 +408,7 @@ impl Display for OptimizeTableStmt<'_> {
408408
.chain(&self.database)
409409
.chain(Some(&self.table)),
410410
)?;
411-
if let Some(action) = &self.action {
412-
write!(f, " {action}")?;
413-
}
411+
write!(f, " {}", &self.action)?;
414412

415413
Ok(())
416414
}
@@ -488,7 +486,9 @@ pub enum CompactTarget {
488486
#[derive(Debug, Clone, PartialEq)]
489487
pub enum OptimizeTableAction<'a> {
490488
All,
491-
Purge,
489+
Purge {
490+
before: Option<TimeTravelPoint<'a>>,
491+
},
492492
Compact {
493493
target: CompactTarget,
494494
limit: Option<Expr<'a>>,
@@ -499,7 +499,13 @@ impl<'a> Display for OptimizeTableAction<'a> {
499499
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500500
match self {
501501
OptimizeTableAction::All => write!(f, "ALL"),
502-
OptimizeTableAction::Purge => write!(f, "PURGE"),
502+
OptimizeTableAction::Purge { before } => {
503+
write!(f, "PURGE")?;
504+
if let Some(point) = before {
505+
write!(f, " BEFORE {}", point)?;
506+
}
507+
Ok(())
508+
}
503509
OptimizeTableAction::Compact { target, limit } => {
504510
match target {
505511
CompactTarget::Block => {

src/query/ast/src/parser/statement.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
516516
);
517517
let optimize_table = map(
518518
rule! {
519-
OPTIMIZE ~ TABLE ~ #peroid_separated_idents_1_to_3 ~ #optimize_table_action?
519+
OPTIMIZE ~ TABLE ~ #peroid_separated_idents_1_to_3 ~ #optimize_table_action
520520
},
521521
|(_, _, (catalog, database, table), action)| {
522522
Statement::OptimizeTable(OptimizeTableStmt {
@@ -1483,7 +1483,12 @@ pub fn alter_table_action(i: Input) -> IResult<AlterTableAction> {
14831483
pub fn optimize_table_action(i: Input) -> IResult<OptimizeTableAction> {
14841484
alt((
14851485
value(OptimizeTableAction::All, rule! { ALL }),
1486-
value(OptimizeTableAction::Purge, rule! { PURGE }),
1486+
map(
1487+
rule! { PURGE ~ (BEFORE ~ #travel_point)?},
1488+
|(_, opt_travel_point)| OptimizeTableAction::Purge {
1489+
before: opt_travel_point.map(|(_, p)| p),
1490+
},
1491+
),
14871492
map(
14881493
rule! { COMPACT ~ (SEGMENT)? ~ ( LIMIT ~ ^#expr )?},
14891494
|(_, opt_segment, opt_limit)| OptimizeTableAction::Compact {

src/query/ast/src/parser/token.rs

+2
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ pub enum TokenKind {
269269
AWS_SECRET_KEY,
270270
#[token("ANTI", ignore(ascii_case))]
271271
ANTI,
272+
#[token("BEFORE", ignore(ascii_case))]
273+
BEFORE,
272274
#[token("BETWEEN", ignore(ascii_case))]
273275
BETWEEN,
274276
#[token("BIGINT", ignore(ascii_case))]

src/query/catalog/src/table.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ pub trait TableExt: Table {
320320

321321
impl<T: ?Sized> TableExt for T where T: Table {}
322322

323-
#[derive(Debug, Clone)]
323+
#[derive(Debug, Clone, Eq, PartialEq)]
324324
pub enum NavigationPoint {
325325
SnapshotID(String),
326326
TimePoint(DateTime<Utc>),

src/query/service/src/interpreters/interpreter_table_optimize.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl Interpreter for OptimizeTableInterpreter {
5555
let action = &plan.action;
5656
let do_purge = matches!(
5757
action,
58-
OptimizeTableAction::Purge | OptimizeTableAction::All
58+
OptimizeTableAction::Purge(_) | OptimizeTableAction::All
5959
);
6060
let do_compact_blocks = matches!(
6161
action,
@@ -114,7 +114,13 @@ impl Interpreter for OptimizeTableInterpreter {
114114
}
115115

116116
if do_purge {
117-
table.purge(self.ctx.clone(), true).await?;
117+
let table = if let OptimizeTableAction::Purge(Some(point)) = action {
118+
table.navigate_to(point).await?
119+
} else {
120+
table
121+
};
122+
let keep_latest = true;
123+
table.purge(self.ctx.clone(), keep_latest).await?;
118124
}
119125

120126
Ok(PipelineBuildResult::create())

src/query/service/tests/it/storages/fuse/operations/navigate.rs

-74
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,17 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::ops::Add;
1615
use std::ops::Sub;
1716
use std::time::Duration;
1817

1918
use common_base::base::tokio;
20-
use common_catalog::table::AppendMode;
2119
use common_datablocks::DataBlock;
2220
use common_exception::ErrorCode;
2321
use common_exception::Result;
2422
use common_storages_fuse::io::SnapshotHistoryReader;
25-
use databend_query::pipelines::Pipeline;
2623
use databend_query::storages::fuse::io::MetaReaders;
2724
use databend_query::storages::fuse::io::TableMetaLocationGenerator;
2825
use databend_query::storages::fuse::FuseTable;
29-
use databend_query::storages::Table;
3026
use futures::TryStreamExt;
3127

3228
use crate::storages::fuse::table_test_fixture::execute_query;
@@ -121,73 +117,3 @@ async fn test_fuse_navigate() -> Result<()> {
121117
};
122118
Ok(())
123119
}
124-
125-
#[tokio::test]
126-
async fn test_fuse_historical_table_is_read_only() -> Result<()> {
127-
// 1. Setup
128-
let fixture = TestFixture::new().await;
129-
let db = fixture.default_db_name();
130-
let tbl = fixture.default_table_name();
131-
let ctx = fixture.ctx();
132-
fixture.create_default_table().await?;
133-
134-
let qry = format!("insert into {}.{} values (1, (2, 3))", db, tbl);
135-
execute_query(ctx.clone(), qry.as_str())
136-
.await?
137-
.try_collect::<Vec<DataBlock>>()
138-
.await?;
139-
140-
// 2. grab the history
141-
let table = fixture.latest_default_table().await?;
142-
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
143-
let loc = fuse_table.snapshot_loc().await?.unwrap();
144-
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
145-
let version = TableMetaLocationGenerator::snapshot_version(loc.as_str());
146-
let snapshots: Vec<_> = reader
147-
.snapshot_history(loc, version, fuse_table.meta_location_generator().clone())
148-
.try_collect()
149-
.await?;
150-
151-
let snapshot = &snapshots[0];
152-
let instant = snapshot
153-
.timestamp
154-
.unwrap()
155-
.add(chrono::Duration::milliseconds(1));
156-
let tbl = fuse_table.navigate_to_time_point(instant).await?;
157-
158-
// check append2
159-
let res = tbl.append_data(
160-
ctx.clone(),
161-
&mut Pipeline::create(),
162-
AppendMode::Normal,
163-
false,
164-
);
165-
assert_not_writable(res, "append2");
166-
167-
// check append_data
168-
let res = fixture
169-
.append_commit_blocks(tbl.clone(), vec![], false, true)
170-
.await;
171-
assert_not_writable(res, "append2");
172-
173-
// check truncate
174-
let res = tbl.truncate(ctx.clone(), false).await;
175-
assert_not_writable(res, "truncate");
176-
177-
Ok(())
178-
}
179-
180-
fn assert_not_writable<T>(res: Result<T>, case_name: &str) {
181-
match res {
182-
Ok(_) => panic!(
183-
"historical table should NOT be writable, case {}",
184-
case_name
185-
),
186-
Err(e) => assert_eq!(
187-
e.code(),
188-
ErrorCode::TABLE_NOT_WRITABLE,
189-
" case {}",
190-
case_name
191-
),
192-
}
193-
}

src/query/service/tests/it/storages/fuse/operations/optimize.rs

+2-17
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,10 @@ use crate::storages::fuse::table_test_fixture::TestFixture;
2323
use crate::storages::fuse::utils::do_purge_test;
2424
use crate::storages::fuse::utils::TestTableOperation;
2525

26-
#[tokio::test]
27-
async fn test_fuse_snapshot_optimize() -> Result<()> {
28-
do_purge_test(
29-
"implicit pure",
30-
TestTableOperation::Optimize("".to_string()),
31-
1,
32-
0,
33-
1,
34-
1,
35-
1,
36-
None,
37-
)
38-
.await
39-
}
40-
4126
#[tokio::test]
4227
async fn test_fuse_snapshot_optimize_purge() -> Result<()> {
4328
do_purge_test(
44-
"explicit pure",
29+
"explicit purge",
4530
TestTableOperation::Optimize("purge".to_string()),
4631
1,
4732
0,
@@ -56,7 +41,7 @@ async fn test_fuse_snapshot_optimize_purge() -> Result<()> {
5641
#[tokio::test]
5742
async fn test_fuse_snapshot_optimize_all() -> Result<()> {
5843
do_purge_test(
59-
"explicit pure",
44+
"explicit purge",
6045
TestTableOperation::Optimize("all".to_string()),
6146
1,
6247
0,

src/query/sql/src/planner/binder/binder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl<'a> Binder {
160160
Statement::AlterTable(stmt) => self.bind_alter_table(bind_context, stmt).await?,
161161
Statement::RenameTable(stmt) => self.bind_rename_table(stmt).await?,
162162
Statement::TruncateTable(stmt) => self.bind_truncate_table(stmt).await?,
163-
Statement::OptimizeTable(stmt) => self.bind_optimize_table(stmt).await?,
163+
Statement::OptimizeTable(stmt) => self.bind_optimize_table(bind_context, stmt).await?,
164164
Statement::AnalyzeTable(stmt) => self.bind_analyze_table(stmt).await?,
165165
Statement::ExistsTable(stmt) => self.bind_exists_table(stmt).await?,
166166

src/query/sql/src/planner/binder/ddl/table.rs

+26-21
Original file line numberDiff line numberDiff line change
@@ -739,13 +739,14 @@ impl<'a> Binder {
739739

740740
pub(in crate::planner::binder) async fn bind_optimize_table(
741741
&mut self,
742+
bind_context: &BindContext,
742743
stmt: &OptimizeTableStmt<'a>,
743744
) -> Result<Plan> {
744745
let OptimizeTableStmt {
745746
catalog,
746747
database,
747748
table,
748-
action,
749+
action: ast_action,
749750
} = stmt;
750751

751752
let catalog = catalog
@@ -757,29 +758,33 @@ impl<'a> Binder {
757758
.map(|ident| normalize_identifier(ident, &self.name_resolution_ctx).name)
758759
.unwrap_or_else(|| self.ctx.get_current_database());
759760
let table = normalize_identifier(table, &self.name_resolution_ctx).name;
760-
let action = if let Some(ast_action) = action {
761-
match ast_action {
762-
AstOptimizeTableAction::All => OptimizeTableAction::All,
763-
AstOptimizeTableAction::Purge => OptimizeTableAction::Purge,
764-
AstOptimizeTableAction::Compact { target, limit } => {
765-
let limit_cnt = match limit {
766-
Some(Expr::Literal {
767-
lit: Literal::Integer(uint),
768-
..
769-
}) => Some(*uint as usize),
770-
Some(_) => {
771-
return Err(ErrorCode::IllegalDataType("Unsupported limit type"));
772-
}
773-
_ => None,
774-
};
775-
match target {
776-
CompactTarget::Block => OptimizeTableAction::CompactBlocks(limit_cnt),
777-
CompactTarget::Segment => OptimizeTableAction::CompactSegments(limit_cnt),
761+
let action = match ast_action {
762+
AstOptimizeTableAction::All => OptimizeTableAction::All,
763+
AstOptimizeTableAction::Purge { before } => {
764+
let p = if let Some(point) = before {
765+
let point = self.resolve_data_travel_point(bind_context, point).await?;
766+
Some(point)
767+
} else {
768+
None
769+
};
770+
OptimizeTableAction::Purge(p)
771+
}
772+
AstOptimizeTableAction::Compact { target, limit } => {
773+
let limit_cnt = match limit {
774+
Some(Expr::Literal {
775+
lit: Literal::Integer(uint),
776+
..
777+
}) => Some(*uint as usize),
778+
Some(_) => {
779+
return Err(ErrorCode::IllegalDataType("Unsupported limit type"));
778780
}
781+
_ => None,
782+
};
783+
match target {
784+
CompactTarget::Block => OptimizeTableAction::CompactBlocks(limit_cnt),
785+
CompactTarget::Segment => OptimizeTableAction::CompactSegments(limit_cnt),
779786
}
780787
}
781-
} else {
782-
OptimizeTableAction::Purge
783788
};
784789

785790
Ok(Plan::OptimizeTable(Box::new(OptimizeTablePlan {

src/query/sql/src/planner/plans/ddl/table.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

1818
use common_ast::ast::Engine;
19+
use common_catalog::table::NavigationPoint;
1920
use common_datavalues::DataField;
2021
use common_datavalues::DataSchema;
2122
use common_datavalues::DataSchemaRef;
@@ -117,11 +118,10 @@ impl OptimizeTablePlan {
117118
}
118119
}
119120

120-
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
121+
#[derive(Clone, Debug, PartialEq, Eq)]
121122
pub enum OptimizeTableAction {
122123
All,
123-
Purge,
124-
Statistic,
124+
Purge(Option<NavigationPoint>),
125125
CompactBlocks(Option<usize>),
126126
CompactSegments(Option<usize>),
127127
}

0 commit comments

Comments
 (0)