Skip to content

Commit 8be688c

Browse files
authored
chore(cubesql): QueryEngine - support custom metadata for QueryPlan (#8817)
1 parent 70a36d8 commit 8be688c

File tree

6 files changed

+46
-49
lines changed

6 files changed

+46
-49
lines changed

rust/cubesql/cubesql/src/compile/plan.rs

+14-23
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,9 @@ pub enum QueryPlan {
4545
MetaOk(StatusFlags, CommandCompletion),
4646
MetaTabular(StatusFlags, Box<dataframe::DataFrame>),
4747
// Query will be executed via Data Fusion
48-
DataFusionSelect(StatusFlags, LogicalPlan, DFSessionContext),
48+
DataFusionSelect(LogicalPlan, DFSessionContext),
4949
// Query will be executed via DataFusion and saved to session
50-
CreateTempTable(
51-
StatusFlags,
52-
LogicalPlan,
53-
DFSessionContext,
54-
String,
55-
Arc<TempTableManager>,
56-
),
50+
CreateTempTable(LogicalPlan, DFSessionContext, String, Arc<TempTableManager>),
5751
}
5852

5953
impl fmt::Debug for QueryPlan {
@@ -70,16 +64,13 @@ impl fmt::Debug for QueryPlan {
7064
flags
7165
))
7266
},
73-
QueryPlan::DataFusionSelect(flags, _, _) => {
74-
f.write_str(&format!(
75-
"DataFusionSelect(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden)",
76-
flags
77-
))
67+
QueryPlan::DataFusionSelect(_, _) => {
68+
f.write_str(&"DataFusionSelect(LogicalPlan: hidden, DFSessionContext: hidden)")
7869
},
79-
QueryPlan::CreateTempTable(flags, _, _, name, _) => {
70+
QueryPlan::CreateTempTable(_, _, name, _) => {
8071
f.write_str(&format!(
81-
"CreateTempTable(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden, Name: {:?}, SessionState: hidden",
82-
flags, name
72+
"CreateTempTable(LogicalPlan: hidden, DFSessionContext: hidden, Name: {}, SessionState: hidden",
73+
name
8374
))
8475
},
8576
}
@@ -89,8 +80,9 @@ impl fmt::Debug for QueryPlan {
8980
impl QueryPlan {
9081
pub fn as_logical_plan(&self) -> LogicalPlan {
9182
match self {
92-
QueryPlan::DataFusionSelect(_, plan, _)
93-
| QueryPlan::CreateTempTable(_, plan, _, _, _) => plan.clone(),
83+
QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => {
84+
plan.clone()
85+
}
9486
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => {
9587
panic!("This query doesnt have a plan, because it already has values for response")
9688
}
@@ -99,8 +91,8 @@ impl QueryPlan {
9991

10092
pub async fn as_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
10193
match self {
102-
QueryPlan::DataFusionSelect(_, plan, ctx)
103-
| QueryPlan::CreateTempTable(_, plan, ctx, _, _) => {
94+
QueryPlan::DataFusionSelect(plan, ctx)
95+
| QueryPlan::CreateTempTable(plan, ctx, _, _) => {
10496
DataFrame::new(ctx.state.clone(), plan)
10597
.create_physical_plan()
10698
.await
@@ -114,8 +106,7 @@ impl QueryPlan {
114106

115107
pub fn print(&self, pretty: bool) -> Result<String, CubeError> {
116108
match self {
117-
QueryPlan::DataFusionSelect(_, plan, _)
118-
| QueryPlan::CreateTempTable(_, plan, _, _, _) => {
109+
QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => {
119110
if pretty {
120111
Ok(plan.display_indent().to_string())
121112
} else {
@@ -134,7 +125,7 @@ pub async fn get_df_batches(
134125
plan: &QueryPlan,
135126
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>, CubeError> {
136127
match plan {
137-
QueryPlan::DataFusionSelect(_, plan, ctx) => {
128+
QueryPlan::DataFusionSelect(plan, ctx) => {
138129
let df = DataFrame::new(ctx.state.clone(), &plan);
139130
let safe_stream = async move {
140131
std::panic::AssertUnwindSafe(df.execute_stream())

rust/cubesql/cubesql/src/compile/query_engine.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
analysis::LogicalPlanAnalysis,
1818
converter::{LogicalPlanToLanguageContext, LogicalPlanToLanguageConverter},
1919
},
20-
CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter, StatusFlags,
20+
CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter,
2121
},
2222
config::ConfigObj,
2323
sql::{
@@ -46,8 +46,12 @@ use datafusion::{
4646

4747
#[async_trait::async_trait]
4848
pub trait QueryEngine {
49+
/// Custom type for AST statement type, It allows to use any parsers for SQL
4950
type AstStatementType: std::fmt::Display + Send;
5051

52+
/// Additional metadata for results of plan method instead of extending query plan
53+
type PlanMetadataType: std::fmt::Debug + Send;
54+
5155
fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache>;
5256

5357
fn transport_ref(&self) -> &Arc<dyn TransportService>;
@@ -70,7 +74,7 @@ pub trait QueryEngine {
7074
&self,
7175
cube_ctx: &CubeContext,
7276
stmt: &Self::AstStatementType,
73-
) -> Result<LogicalPlan, DataFusionError>;
77+
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError>;
7478

7579
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;
7680

@@ -81,11 +85,11 @@ pub trait QueryEngine {
8185
span_id: Option<Arc<SpanId>>,
8286
meta: Arc<MetaContext>,
8387
state: Arc<SessionState>,
84-
) -> CompilationResult<QueryPlan> {
88+
) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> {
8589
let ctx = self.create_session_ctx(state.clone())?;
8690
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;
8791

88-
let plan = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
92+
let (plan, metadata) = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
8993
let message = format!("Initial planning error: {}", err,);
9094
let meta = Some(HashMap::from([
9195
("query".to_string(), stmt.to_string()),
@@ -249,11 +253,7 @@ pub trait QueryEngine {
249253
qtrace.set_best_plan_and_cube_scans(&rewrite_plan);
250254
}
251255

252-
Ok(QueryPlan::DataFusionSelect(
253-
StatusFlags::empty(),
254-
rewrite_plan,
255-
ctx,
256-
))
256+
Ok((QueryPlan::DataFusionSelect(rewrite_plan, ctx), metadata))
257257
}
258258

259259
fn evaluate_wrapped_sql(
@@ -308,6 +308,8 @@ impl SqlQueryEngine {
308308
impl QueryEngine for SqlQueryEngine {
309309
type AstStatementType = sqlparser::ast::Statement;
310310

311+
type PlanMetadataType = ();
312+
311313
fn create_cube_ctx(
312314
&self,
313315
state: Arc<SessionState>,
@@ -470,12 +472,12 @@ impl QueryEngine for SqlQueryEngine {
470472
&self,
471473
cube_ctx: &CubeContext,
472474
stmt: &Self::AstStatementType,
473-
) -> Result<LogicalPlan, DataFusionError> {
475+
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError> {
474476
let df_query_planner = SqlToRel::new_with_options(cube_ctx, true);
475477
let plan =
476-
df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())));
478+
df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?;
477479

478-
plan
480+
Ok((plan, ()))
479481
}
480482

481483
fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache> {

rust/cubesql/cubesql/src/compile/router.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,8 @@ impl QueryRouter {
356356
)])],
357357
)),
358358
)),
359-
QueryPlan::DataFusionSelect(flags, plan, context)
360-
| QueryPlan::CreateTempTable(flags, plan, context, _, _) => {
359+
QueryPlan::DataFusionSelect(plan, context)
360+
| QueryPlan::CreateTempTable(plan, context, _, _) => {
361361
// EXPLAIN over CREATE TABLE AS shows the SELECT query plan
362362
let plan = Arc::new(plan);
363363
let schema = LogicalPlan::explain_schema();
@@ -385,7 +385,7 @@ impl QueryRouter {
385385
})
386386
};
387387

388-
Ok(QueryPlan::DataFusionSelect(flags, explain_plan, context))
388+
Ok(QueryPlan::DataFusionSelect(explain_plan, context))
389389
}
390390
}
391391
})
@@ -596,7 +596,7 @@ impl QueryRouter {
596596
span_id: Option<Arc<SpanId>>,
597597
) -> Result<QueryPlan, CompilationError> {
598598
let plan = self.select_to_plan(stmt, qtrace, span_id).await?;
599-
let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else {
599+
let QueryPlan::DataFusionSelect(plan, ctx) = plan else {
600600
return Err(CompilationError::internal(
601601
"unable to build DataFusion plan from Query".to_string(),
602602
));
@@ -608,8 +608,8 @@ impl QueryRouter {
608608
"table name contains no ident parts".to_string(),
609609
));
610610
};
611+
611612
Ok(QueryPlan::CreateTempTable(
612-
flags,
613613
plan,
614614
ctx,
615615
table_name.value.to_string(),
@@ -708,9 +708,11 @@ impl QueryRouter {
708708
}
709709

710710
let sql_query_engine = SqlQueryEngine::new(self.session_manager.clone());
711-
sql_query_engine
711+
let (plan, _) = sql_query_engine
712712
.plan(stmt, qtrace, span_id, self.meta.clone(), self.state.clone())
713-
.await
713+
.await?;
714+
715+
Ok(plan)
714716
}
715717
}
716718

rust/cubesql/cubesql/src/compile/test/mod.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -933,19 +933,21 @@ impl TestContext {
933933
.await
934934
.map_err(|e| CubeError::internal(format!("Error during planning: {}", e)))?;
935935
match query {
936-
QueryPlan::DataFusionSelect(flags, plan, ctx) => {
936+
QueryPlan::DataFusionSelect(plan, ctx) => {
937937
let df = DFDataFrame::new(ctx.state, &plan);
938938
let batches = df.collect().await?;
939939
let frame = batches_to_dataframe(&df.schema().into(), batches)?;
940940

941941
output.push(frame.print());
942-
output_flags = flags;
943942
}
944943
QueryPlan::MetaTabular(flags, frame) => {
945944
output.push(frame.print());
946945
output_flags = flags;
947946
}
948-
QueryPlan::MetaOk(flags, _) | QueryPlan::CreateTempTable(flags, _, _, _, _) => {
947+
QueryPlan::CreateTempTable(_, _, _, _) => {
948+
// nothing to do
949+
}
950+
QueryPlan::MetaOk(flags, _) => {
949951
output_flags = flags;
950952
}
951953
}

rust/cubesql/cubesql/src/sql/postgres/extended.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ impl Portal {
492492

493493
return;
494494
}
495-
QueryPlan::DataFusionSelect(_, plan, ctx) => {
495+
QueryPlan::DataFusionSelect(plan, ctx) => {
496496
let df = DFDataFrame::new(ctx.state.clone(), &plan);
497497
let safe_stream = async move {
498498
std::panic::AssertUnwindSafe(df.execute_stream())
@@ -511,7 +511,7 @@ impl Portal {
511511
Err(err) => return yield Err(CubeError::panic(err).into()),
512512
}
513513
}
514-
QueryPlan::CreateTempTable(_, plan, ctx, name, temp_tables) => {
514+
QueryPlan::CreateTempTable(plan, ctx, name, temp_tables) => {
515515
let df = DFDataFrame::new(ctx.state.clone(), &plan);
516516
let record_batch = df.collect();
517517
let row_count = match record_batch.await {

rust/cubesql/cubesql/src/sql/postgres/shim.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl QueryPlanExt for QueryPlan {
7272
required_format: protocol::Format,
7373
) -> Result<Option<protocol::RowDescription>, ConnectionError> {
7474
match &self {
75-
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _, _) => Ok(None),
75+
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _) => Ok(None),
7676
QueryPlan::MetaTabular(_, frame) => {
7777
let mut result = vec![];
7878

@@ -86,7 +86,7 @@ impl QueryPlanExt for QueryPlan {
8686

8787
Ok(Some(protocol::RowDescription::new(result)))
8888
}
89-
QueryPlan::DataFusionSelect(_, logical_plan, _) => {
89+
QueryPlan::DataFusionSelect(logical_plan, _) => {
9090
let mut result = vec![];
9191

9292
for field in logical_plan.schema().fields() {

0 commit comments

Comments
 (0)