@@ -3,7 +3,7 @@ use crate::compile::{
3
3
StatusFlags ,
4
4
} ;
5
5
use sqlparser:: ast;
6
- use std:: { collections:: HashMap , future :: Future , pin :: Pin , sync:: Arc , time:: SystemTime } ;
6
+ use std:: { collections:: HashMap , sync:: Arc , time:: SystemTime } ;
7
7
8
8
use crate :: {
9
9
compile:: {
@@ -30,7 +30,7 @@ use datafusion::{
30
30
scalar:: ScalarValue ,
31
31
} ;
32
32
use itertools:: Itertools ;
33
- use sqlparser:: ast:: { escape_single_quote_string, ObjectName } ;
33
+ use sqlparser:: ast:: escape_single_quote_string;
34
34
35
35
#[ derive( Clone ) ]
36
36
pub struct QueryRouter {
@@ -108,6 +108,23 @@ impl QueryRouter {
108
108
}
109
109
110
110
pub async fn plan (
111
+ & self ,
112
+ stmt : ast:: Statement ,
113
+ qtrace : & mut Option < Qtrace > ,
114
+ span_id : Option < Arc < SpanId > > ,
115
+ ) -> CompilationResult < QueryPlan > {
116
+ match stmt {
117
+ ast:: Statement :: Explain {
118
+ analyze,
119
+ statement,
120
+ verbose,
121
+ ..
122
+ } => self . explain_to_plan ( statement, verbose, analyze) . await ,
123
+ other => self . plan_query ( & other, qtrace, span_id) . await ,
124
+ }
125
+ }
126
+
127
+ async fn plan_query (
111
128
& self ,
112
129
stmt : & ast:: Statement ,
113
130
qtrace : & mut Option < Qtrace > ,
@@ -134,15 +151,6 @@ impl QueryRouter {
134
151
( ast:: Statement :: ShowVariable { variable } , _) => {
135
152
self . show_variable_to_plan ( variable, span_id. clone ( ) ) . await
136
153
}
137
- (
138
- ast:: Statement :: Explain {
139
- statement,
140
- verbose,
141
- analyze,
142
- ..
143
- } ,
144
- _,
145
- ) => self . explain_to_plan ( & statement, * verbose, * analyze) . await ,
146
154
( ast:: Statement :: StartTransaction { .. } , DatabaseProtocol :: PostgreSQL ) => {
147
155
// TODO: Real support
148
156
Ok ( QueryPlan :: MetaOk (
@@ -260,68 +268,62 @@ impl QueryRouter {
260
268
. await
261
269
}
262
270
263
- fn explain_to_plan (
271
+ async fn explain_to_plan (
264
272
& self ,
265
- statement : & Box < ast:: Statement > ,
273
+ statement : Box < ast:: Statement > ,
266
274
verbose : bool ,
267
275
analyze : bool ,
268
- ) -> Pin < Box < dyn Future < Output = Result < QueryPlan , CompilationError > > + Send > > {
269
- let self_cloned = self . clone ( ) ;
270
-
271
- let statement = statement. clone ( ) ;
272
- // This Boxing construct here because of recursive call to self.plan()
273
- Box :: pin ( async move {
274
- // TODO span_id ?
275
- let plan = self_cloned. plan ( & statement, & mut None , None ) . await ?;
276
+ ) -> Result < QueryPlan , CompilationError > {
277
+ // TODO span_id ?
278
+ let plan = self . plan_query ( & statement, & mut None , None ) . await ?;
276
279
277
- match plan {
278
- QueryPlan :: MetaOk ( _, _) | QueryPlan :: MetaTabular ( _, _) => Ok ( QueryPlan :: MetaTabular (
279
- StatusFlags :: empty ( ) ,
280
- Box :: new ( dataframe:: DataFrame :: new (
281
- vec ! [ dataframe:: Column :: new(
282
- "Execution Plan" . to_string( ) ,
283
- ColumnType :: String ,
284
- ColumnFlags :: empty( ) ,
285
- ) ] ,
286
- vec ! [ dataframe:: Row :: new( vec![ dataframe:: TableValue :: String (
287
- "This query doesnt have a plan, because it already has values for response"
288
- . to_string( ) ,
289
- ) ] ) ] ,
290
- ) ) ,
280
+ match plan {
281
+ QueryPlan :: MetaOk ( _, _) | QueryPlan :: MetaTabular ( _, _) => Ok ( QueryPlan :: MetaTabular (
282
+ StatusFlags :: empty ( ) ,
283
+ Box :: new ( dataframe:: DataFrame :: new (
284
+ vec ! [ dataframe:: Column :: new(
285
+ "Execution Plan" . to_string( ) ,
286
+ ColumnType :: String ,
287
+ ColumnFlags :: empty( ) ,
288
+ ) ] ,
289
+ vec ! [ dataframe:: Row :: new( vec![ dataframe:: TableValue :: String (
290
+ "This query doesnt have a plan, because it already has values for response"
291
+ . to_string( ) ,
292
+ ) ] ) ] ,
291
293
) ) ,
292
- QueryPlan :: DataFusionSelect ( plan, context)
293
- | QueryPlan :: CreateTempTable ( plan, context, _, _) => {
294
- // EXPLAIN over CREATE TABLE AS shows the SELECT query plan
295
- let plan = Arc :: new ( plan) ;
296
- let schema = LogicalPlan :: explain_schema ( ) ;
297
- let schema = schema. to_dfschema_ref ( ) . map_err ( |err| {
298
- CompilationError :: internal ( format ! (
299
- "Unable to get DF schema for explain plan: {}" ,
300
- err
301
- ) )
302
- } ) ?;
303
-
304
- let explain_plan = if analyze {
305
- LogicalPlan :: Analyze ( Analyze {
306
- verbose,
307
- input : plan,
308
- schema,
309
- } )
310
- } else {
311
- let stringified_plans = vec ! [ plan. to_stringified( PlanType :: InitialLogicalPlan ) ] ;
312
-
313
- LogicalPlan :: Explain ( Explain {
314
- verbose,
315
- plan,
316
- stringified_plans,
317
- schema,
318
- } )
319
- } ;
294
+ ) ) ,
295
+ QueryPlan :: DataFusionSelect ( plan, context)
296
+ | QueryPlan :: CreateTempTable ( plan, context, _, _) => {
297
+ // EXPLAIN over CREATE TABLE AS shows the SELECT query plan
298
+ let plan = Arc :: new ( plan) ;
299
+ let schema = LogicalPlan :: explain_schema ( ) ;
300
+ let schema = schema. to_dfschema_ref ( ) . map_err ( |err| {
301
+ CompilationError :: internal ( format ! (
302
+ "Unable to get DF schema for explain plan: {}" ,
303
+ err
304
+ ) )
305
+ } ) ?;
320
306
321
- Ok ( QueryPlan :: DataFusionSelect ( explain_plan, context) )
322
- }
307
+ let explain_plan = if analyze {
308
+ LogicalPlan :: Analyze ( Analyze {
309
+ verbose,
310
+ input : plan,
311
+ schema,
312
+ } )
313
+ } else {
314
+ let stringified_plans = vec ! [ plan. to_stringified( PlanType :: InitialLogicalPlan ) ] ;
315
+
316
+ LogicalPlan :: Explain ( Explain {
317
+ verbose,
318
+ plan,
319
+ stringified_plans,
320
+ schema,
321
+ } )
322
+ } ;
323
+
324
+ Ok ( QueryPlan :: DataFusionSelect ( explain_plan, context) )
323
325
}
324
- } )
326
+ }
325
327
}
326
328
327
329
fn set_role_to_plan (
@@ -535,7 +537,7 @@ impl QueryRouter {
535
537
) ) ;
536
538
} ;
537
539
538
- let ObjectName ( ident_parts) = name;
540
+ let ast :: ObjectName ( ident_parts) = name;
539
541
let Some ( table_name) = ident_parts. last ( ) else {
540
542
return Err ( CompilationError :: internal (
541
543
"table name contains no ident parts" . to_string ( ) ,
@@ -585,7 +587,7 @@ impl QueryRouter {
585
587
"DROP TABLE supports dropping only one table at a time" . to_string ( ) ,
586
588
) ) ;
587
589
}
588
- let ObjectName ( ident_parts) = names. first ( ) . unwrap ( ) ;
590
+ let ast :: ObjectName ( ident_parts) = names. first ( ) . unwrap ( ) ;
589
591
let Some ( table_name) = ident_parts. last ( ) else {
590
592
return Err ( CompilationError :: internal (
591
593
"table name contains no ident parts" . to_string ( ) ,
@@ -674,7 +676,7 @@ pub async fn convert_statement_to_cube_query(
674
676
}
675
677
676
678
let planner = QueryRouter :: new ( session. state . clone ( ) , meta, session. session_manager . clone ( ) ) ;
677
- planner. plan ( & stmt, qtrace, span_id) . await
679
+ planner. plan ( stmt, qtrace, span_id) . await
678
680
}
679
681
680
682
pub async fn convert_sql_to_cube_query (
0 commit comments