Skip to content

Commit ca0cae3

Browse files
authored
Fix qualify_columns failing on correlated scalar subqueries (#51)
1 parent 4327daa commit ca0cae3

File tree

3 files changed

+350
-3
lines changed

3 files changed

+350
-3
lines changed

crates/polyglot-sql/src/lineage.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,6 +1843,120 @@ mod tests {
18431843
assert_eq!(root_with_schema, Some(DataType::Text));
18441844
}
18451845

1846+
#[test]
1847+
fn test_lineage_with_schema_correlated_scalar_subquery() {
1848+
let query =
1849+
"SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
1850+
let dialect = Dialect::get(DialectType::BigQuery);
1851+
let expr = dialect
1852+
.parse(query)
1853+
.unwrap()
1854+
.into_iter()
1855+
.next()
1856+
.expect("expected one expression");
1857+
1858+
let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1859+
schema
1860+
.add_table(
1861+
"t1",
1862+
&[("id".into(), DataType::BigInt { length: None })],
1863+
None,
1864+
)
1865+
.expect("schema setup");
1866+
schema
1867+
.add_table(
1868+
"t2",
1869+
&[
1870+
("id".into(), DataType::BigInt { length: None }),
1871+
("val".into(), DataType::BigInt { length: None }),
1872+
],
1873+
None,
1874+
)
1875+
.expect("schema setup");
1876+
1877+
let node = lineage_with_schema(
1878+
"id",
1879+
&expr,
1880+
Some(&schema),
1881+
Some(DialectType::BigQuery),
1882+
false,
1883+
)
1884+
.expect("lineage_with_schema should handle correlated scalar subqueries");
1885+
1886+
assert_eq!(node.name, "id");
1887+
}
1888+
1889+
#[test]
1890+
fn test_lineage_with_schema_join_using() {
1891+
let query = "SELECT a FROM t1 JOIN t2 USING(a)";
1892+
let dialect = Dialect::get(DialectType::BigQuery);
1893+
let expr = dialect
1894+
.parse(query)
1895+
.unwrap()
1896+
.into_iter()
1897+
.next()
1898+
.expect("expected one expression");
1899+
1900+
let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1901+
schema
1902+
.add_table(
1903+
"t1",
1904+
&[("a".into(), DataType::BigInt { length: None })],
1905+
None,
1906+
)
1907+
.expect("schema setup");
1908+
schema
1909+
.add_table(
1910+
"t2",
1911+
&[("a".into(), DataType::BigInt { length: None })],
1912+
None,
1913+
)
1914+
.expect("schema setup");
1915+
1916+
let node = lineage_with_schema(
1917+
"a",
1918+
&expr,
1919+
Some(&schema),
1920+
Some(DialectType::BigQuery),
1921+
false,
1922+
)
1923+
.expect("lineage_with_schema should handle JOIN USING");
1924+
1925+
assert_eq!(node.name, "a");
1926+
}
1927+
1928+
#[test]
1929+
fn test_lineage_with_schema_qualified_table_name() {
1930+
let query = "SELECT a FROM raw.t1";
1931+
let dialect = Dialect::get(DialectType::BigQuery);
1932+
let expr = dialect
1933+
.parse(query)
1934+
.unwrap()
1935+
.into_iter()
1936+
.next()
1937+
.expect("expected one expression");
1938+
1939+
let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1940+
schema
1941+
.add_table(
1942+
"raw.t1",
1943+
&[("a".into(), DataType::BigInt { length: None })],
1944+
None,
1945+
)
1946+
.expect("schema setup");
1947+
1948+
let node = lineage_with_schema(
1949+
"a",
1950+
&expr,
1951+
Some(&schema),
1952+
Some(DialectType::BigQuery),
1953+
false,
1954+
)
1955+
.expect("lineage_with_schema should handle dotted schema.table names");
1956+
1957+
assert_eq!(node.name, "a");
1958+
}
1959+
18461960
#[test]
18471961
fn test_lineage_with_schema_none_matches_lineage() {
18481962
let expr = parse("SELECT a FROM t");

crates/polyglot-sql/src/optimizer/qualify_columns.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,66 @@ pub fn validate_qualify_columns(expression: &Expression) -> QualifyColumnsResult
222222
Ok(())
223223
}
224224

225+
/// Collect USING column names from JOIN clauses and register each with the
226+
/// resolver, mapping them to the first FROM-clause source that contains them.
227+
fn register_using_columns(select: &Select, resolver: &mut Resolver) {
228+
let using_cols: Vec<String> = select
229+
.joins
230+
.iter()
231+
.flat_map(|j| j.using.iter().map(|id| id.name.clone()))
232+
.collect();
233+
234+
if using_cols.is_empty() {
235+
return;
236+
}
237+
238+
// Collect source names from the FROM clause (left side of joins) in order.
239+
let from_sources: Vec<String> = select
240+
.from
241+
.as_ref()
242+
.map(|f| {
243+
f.expressions
244+
.iter()
245+
.filter_map(|expr| match expr {
246+
Expression::Table(t) => Some(
247+
t.alias
248+
.as_ref()
249+
.map(|a| a.name.clone())
250+
.unwrap_or_else(|| t.name.name.clone()),
251+
),
252+
_ => None,
253+
})
254+
.collect()
255+
})
256+
.unwrap_or_default();
257+
258+
for col_name in using_cols {
259+
// Find the first FROM-clause source that contains this column.
260+
let table = from_sources.iter().find_map(|source| {
261+
resolver
262+
.get_source_columns(source)
263+
.ok()
264+
.filter(|cols| cols.contains(&col_name))
265+
.map(|_| source.clone())
266+
});
267+
if let Some(table_name) = table {
268+
resolver.add_using_column(col_name, table_name);
269+
}
270+
}
271+
}
272+
225273
/// Qualify columns in a scope by adding table qualifiers
226274
fn qualify_columns_in_scope(
227275
select: &mut Select,
228276
scope: &Scope,
229277
resolver: &mut Resolver,
230278
allow_partial: bool,
231279
) -> QualifyColumnsResult<()> {
280+
// Register JOIN USING columns so the resolver can disambiguate them.
281+
// USING columns exist in both joined tables; resolve each to a FROM-clause
282+
// source (the left side of the join).
283+
register_using_columns(select, resolver);
284+
232285
for expr in &mut select.expressions {
233286
qualify_columns_in_expression(expr, scope, resolver, allow_partial)?;
234287
}
@@ -468,6 +521,12 @@ fn qualify_single_column(
468521
if let Some(table) = &col.table {
469522
let table_name = &table.name;
470523
if !scope.sources.contains_key(table_name) {
524+
// Allow correlated references: if the table exists in the schema
525+
// but not in the current scope, it may be referencing an outer scope
526+
// (e.g., in a correlated scalar subquery).
527+
if resolver.table_exists_in_schema(table_name) {
528+
return Ok(());
529+
}
471530
return Err(QualifyColumnsError::UnknownTable(table_name.clone()));
472531
}
473532

@@ -2467,6 +2526,143 @@ mod tests {
24672526
assert!(sql.contains("t.b"));
24682527
}
24692528

2529+
#[test]
2530+
fn test_qualify_columns_join_using() {
2531+
let expr = parse("SELECT a FROM t1 JOIN t2 USING(a)");
2532+
2533+
let mut schema = MappingSchema::new();
2534+
schema
2535+
.add_table(
2536+
"t1",
2537+
&[("a".to_string(), DataType::BigInt { length: None })],
2538+
None,
2539+
)
2540+
.expect("schema setup");
2541+
schema
2542+
.add_table(
2543+
"t2",
2544+
&[("a".to_string(), DataType::BigInt { length: None })],
2545+
None,
2546+
)
2547+
.expect("schema setup");
2548+
2549+
let result =
2550+
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
2551+
let sql = gen(&result);
2552+
2553+
// The USING column should be qualified with the left (FROM) table
2554+
assert!(sql.contains("t1.a"), "USING column should resolve to FROM table: {sql}");
2555+
}
2556+
2557+
#[test]
2558+
fn test_qualify_columns_join_using_multiple_columns() {
2559+
let expr = parse("SELECT a, b FROM t1 JOIN t2 USING(a, b)");
2560+
2561+
let mut schema = MappingSchema::new();
2562+
schema
2563+
.add_table(
2564+
"t1",
2565+
&[
2566+
("a".to_string(), DataType::BigInt { length: None }),
2567+
("b".to_string(), DataType::BigInt { length: None }),
2568+
],
2569+
None,
2570+
)
2571+
.expect("schema setup");
2572+
schema
2573+
.add_table(
2574+
"t2",
2575+
&[
2576+
("a".to_string(), DataType::BigInt { length: None }),
2577+
("b".to_string(), DataType::BigInt { length: None }),
2578+
],
2579+
None,
2580+
)
2581+
.expect("schema setup");
2582+
2583+
let result =
2584+
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
2585+
let sql = gen(&result);
2586+
2587+
assert!(sql.contains("t1.a"), "USING column 'a' should resolve: {sql}");
2588+
assert!(sql.contains("t1.b"), "USING column 'b' should resolve: {sql}");
2589+
}
2590+
2591+
#[test]
2592+
fn test_qualify_columns_qualified_table_name() {
2593+
let expr = parse("SELECT a FROM raw.t1");
2594+
2595+
let mut schema = MappingSchema::new();
2596+
schema
2597+
.add_table(
2598+
"raw.t1",
2599+
&[("a".to_string(), DataType::BigInt { length: None })],
2600+
None,
2601+
)
2602+
.expect("schema setup");
2603+
2604+
let result =
2605+
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
2606+
let sql = gen(&result);
2607+
2608+
assert!(
2609+
sql.contains("t1.a"),
2610+
"column should be qualified with table name: {sql}"
2611+
);
2612+
}
2613+
2614+
#[test]
2615+
fn test_qualify_columns_correlated_scalar_subquery() {
2616+
let expr =
2617+
parse("SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1");
2618+
2619+
let mut schema = MappingSchema::new();
2620+
schema
2621+
.add_table(
2622+
"t1",
2623+
&[("id".to_string(), DataType::BigInt { length: None })],
2624+
None,
2625+
)
2626+
.expect("schema setup");
2627+
schema
2628+
.add_table(
2629+
"t2",
2630+
&[
2631+
("id".to_string(), DataType::BigInt { length: None }),
2632+
("val".to_string(), DataType::BigInt { length: None }),
2633+
],
2634+
None,
2635+
)
2636+
.expect("schema setup");
2637+
2638+
let result =
2639+
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
2640+
let sql = gen(&result);
2641+
2642+
assert!(sql.contains("t1.id"), "outer column should be qualified: {sql}");
2643+
assert!(sql.contains("t2.id"), "inner column should be qualified: {sql}");
2644+
}
2645+
2646+
#[test]
2647+
fn test_qualify_columns_rejects_unknown_table() {
2648+
let expr = parse("SELECT id FROM t1 WHERE nonexistent.col = 1");
2649+
2650+
let mut schema = MappingSchema::new();
2651+
schema
2652+
.add_table(
2653+
"t1",
2654+
&[("id".to_string(), DataType::BigInt { length: None })],
2655+
None,
2656+
)
2657+
.expect("schema setup");
2658+
2659+
let result = qualify_columns(expr, &schema, &QualifyColumnsOptions::new());
2660+
assert!(
2661+
result.is_err(),
2662+
"should reject reference to table not in scope or schema"
2663+
);
2664+
}
2665+
24702666
// ======================================================================
24712667
// quote_identifiers tests
24722668
// ======================================================================

0 commit comments

Comments
 (0)