@@ -267,22 +267,64 @@ pub struct Limit {
267
267
/// Evaluates correlated sub queries
268
268
#[ derive( Clone ) ]
269
269
pub struct Subquery {
270
- /// The list of sub queries
271
- pub subqueries : Vec < LogicalPlan > ,
272
270
/// The incoming logical plan
273
271
pub input : Arc < LogicalPlan > ,
272
+ /// The list of sub queries
273
+ pub subqueries : Vec < LogicalPlan > ,
274
+ /// The list of subquery types
275
+ pub types : Vec < SubqueryType > ,
274
276
/// The schema description of the output
275
277
pub schema : DFSchemaRef ,
276
278
}
277
279
280
+ /// Subquery type
281
+ #[ derive( Debug , Clone , Copy , PartialEq , Eq , Hash , PartialOrd , Ord ) ]
282
+ pub enum SubqueryType {
283
+ /// Scalar (SELECT, WHERE) evaluating to one value
284
+ Scalar ,
285
+ // This will be extended with `Exists` and `AnyAll` types.
286
+ }
287
+
288
+ impl Display for SubqueryType {
289
+ fn fmt ( & self , f : & mut Formatter ) -> fmt:: Result {
290
+ let subquery_type = match self {
291
+ SubqueryType :: Scalar => "Scalar" ,
292
+ } ;
293
+ write ! ( f, "{}" , subquery_type)
294
+ }
295
+ }
296
+
278
297
impl Subquery {
279
298
/// Merge schema of main input and correlated subquery columns
280
- pub fn merged_schema ( input : & LogicalPlan , subqueries : & [ LogicalPlan ] ) -> DFSchema {
281
- subqueries. iter ( ) . fold ( ( * * input. schema ( ) ) . clone ( ) , |a, b| {
282
- let mut res = a;
283
- res. merge ( b. schema ( ) ) ;
284
- res
285
- } )
299
+ pub fn merged_schema (
300
+ input : & LogicalPlan ,
301
+ subqueries : & [ LogicalPlan ] ,
302
+ types : & [ SubqueryType ] ,
303
+ ) -> DFSchema {
304
+ subqueries. iter ( ) . zip ( types. iter ( ) ) . fold (
305
+ ( * * input. schema ( ) ) . clone ( ) ,
306
+ |schema, ( plan, typ) | {
307
+ let mut schema = schema;
308
+ schema. merge ( & Self :: transform_dfschema ( plan. schema ( ) , * typ) ) ;
309
+ schema
310
+ } ,
311
+ )
312
+ }
313
+
314
+ /// Transform DataFusion schema according to subquery type
315
+ pub fn transform_dfschema ( schema : & DFSchema , typ : SubqueryType ) -> DFSchema {
316
+ match typ {
317
+ SubqueryType :: Scalar => schema. clone ( ) ,
318
+ // Schema will be transformed for `Exists` and `AnyAll`
319
+ }
320
+ }
321
+
322
+ /// Transform Arrow field according to subquery type
323
+ pub fn transform_field ( field : & Field , typ : SubqueryType ) -> Field {
324
+ match typ {
325
+ SubqueryType :: Scalar => field. clone ( ) ,
326
+ // Field will be transformed for `Exists` and `AnyAll`
327
+ }
286
328
}
287
329
}
288
330
@@ -475,13 +517,23 @@ impl LogicalPlan {
475
517
LogicalPlan :: Values ( Values { schema, .. } ) => vec ! [ schema] ,
476
518
LogicalPlan :: Window ( Window { input, schema, .. } )
477
519
| LogicalPlan :: Projection ( Projection { input, schema, .. } )
478
- | LogicalPlan :: Subquery ( Subquery { input, schema, .. } )
479
520
| LogicalPlan :: Aggregate ( Aggregate { input, schema, .. } )
480
521
| LogicalPlan :: TableUDFs ( TableUDFs { input, schema, .. } ) => {
481
522
let mut schemas = input. all_schemas ( ) ;
482
523
schemas. insert ( 0 , schema) ;
483
524
schemas
484
525
}
526
+ LogicalPlan :: Subquery ( Subquery {
527
+ input,
528
+ subqueries,
529
+ schema,
530
+ ..
531
+ } ) => {
532
+ let mut schemas = input. all_schemas ( ) ;
533
+ schemas. extend ( subqueries. iter ( ) . map ( |s| s. schema ( ) ) ) ;
534
+ schemas. insert ( 0 , schema) ;
535
+ schemas
536
+ }
485
537
LogicalPlan :: Join ( Join {
486
538
left,
487
539
right,
@@ -1063,7 +1115,9 @@ impl LogicalPlan {
1063
1115
}
1064
1116
Ok ( ( ) )
1065
1117
}
1066
- LogicalPlan :: Subquery ( Subquery { .. } ) => write ! ( f, "Subquery" ) ,
1118
+ LogicalPlan :: Subquery ( Subquery { types, .. } ) => {
1119
+ write ! ( f, "Subquery: types={:?}" , types)
1120
+ }
1067
1121
LogicalPlan :: Filter ( Filter {
1068
1122
predicate : ref expr,
1069
1123
..
0 commit comments