Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions crates/polyglot-sql/src/lineage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,120 @@ mod tests {
assert_eq!(root_with_schema, Some(DataType::Text));
}

#[test]
fn test_lineage_with_schema_correlated_scalar_subquery() {
let query =
"SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
let dialect = Dialect::get(DialectType::BigQuery);
let expr = dialect
.parse(query)
.unwrap()
.into_iter()
.next()
.expect("expected one expression");

let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
schema
.add_table(
"t1",
&[("id".into(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");
schema
.add_table(
"t2",
&[
("id".into(), DataType::BigInt { length: None }),
("val".into(), DataType::BigInt { length: None }),
],
None,
)
.expect("schema setup");

let node = lineage_with_schema(
"id",
&expr,
Some(&schema),
Some(DialectType::BigQuery),
false,
)
.expect("lineage_with_schema should handle correlated scalar subqueries");

assert_eq!(node.name, "id");
}

#[test]
fn test_lineage_with_schema_join_using() {
let query = "SELECT a FROM t1 JOIN t2 USING(a)";
let dialect = Dialect::get(DialectType::BigQuery);
let expr = dialect
.parse(query)
.unwrap()
.into_iter()
.next()
.expect("expected one expression");

let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
schema
.add_table(
"t1",
&[("a".into(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");
schema
.add_table(
"t2",
&[("a".into(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");

let node = lineage_with_schema(
"a",
&expr,
Some(&schema),
Some(DialectType::BigQuery),
false,
)
.expect("lineage_with_schema should handle JOIN USING");

assert_eq!(node.name, "a");
}

#[test]
fn test_lineage_with_schema_qualified_table_name() {
let query = "SELECT a FROM raw.t1";
let dialect = Dialect::get(DialectType::BigQuery);
let expr = dialect
.parse(query)
.unwrap()
.into_iter()
.next()
.expect("expected one expression");

let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
schema
.add_table(
"raw.t1",
&[("a".into(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");

let node = lineage_with_schema(
"a",
&expr,
Some(&schema),
Some(DialectType::BigQuery),
false,
)
.expect("lineage_with_schema should handle dotted schema.table names");

assert_eq!(node.name, "a");
}

#[test]
fn test_lineage_with_schema_none_matches_lineage() {
let expr = parse("SELECT a FROM t");
Expand Down
196 changes: 196 additions & 0 deletions crates/polyglot-sql/src/optimizer/qualify_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,66 @@ pub fn validate_qualify_columns(expression: &Expression) -> QualifyColumnsResult
Ok(())
}

/// Collect USING column names from JOIN clauses and register each with the
/// resolver, mapping them to the first FROM-clause source that contains them.
fn register_using_columns(select: &Select, resolver: &mut Resolver) {
let using_cols: Vec<String> = select
.joins
.iter()
.flat_map(|j| j.using.iter().map(|id| id.name.clone()))
.collect();

if using_cols.is_empty() {
return;
}

// Collect source names from the FROM clause (left side of joins) in order.
let from_sources: Vec<String> = select
.from
.as_ref()
.map(|f| {
f.expressions
.iter()
.filter_map(|expr| match expr {
Expression::Table(t) => Some(
t.alias
.as_ref()
.map(|a| a.name.clone())
.unwrap_or_else(|| t.name.name.clone()),
),
_ => None,
})
.collect()
})
.unwrap_or_default();

for col_name in using_cols {
// Find the first FROM-clause source that contains this column.
let table = from_sources.iter().find_map(|source| {
resolver
.get_source_columns(source)
.ok()
.filter(|cols| cols.contains(&col_name))
.map(|_| source.clone())
});
if let Some(table_name) = table {
resolver.add_using_column(col_name, table_name);
}
}
}

/// Qualify columns in a scope by adding table qualifiers
fn qualify_columns_in_scope(
select: &mut Select,
scope: &Scope,
resolver: &mut Resolver,
allow_partial: bool,
) -> QualifyColumnsResult<()> {
// Register JOIN USING columns so the resolver can disambiguate them.
// USING columns exist in both joined tables; resolve each to a FROM-clause
// source (the left side of the join).
register_using_columns(select, resolver);

for expr in &mut select.expressions {
qualify_columns_in_expression(expr, scope, resolver, allow_partial)?;
}
Expand Down Expand Up @@ -468,6 +521,12 @@ fn qualify_single_column(
if let Some(table) = &col.table {
let table_name = &table.name;
if !scope.sources.contains_key(table_name) {
// Allow correlated references: if the table exists in the schema
// but not in the current scope, it may be referencing an outer scope
// (e.g., in a correlated scalar subquery).
if resolver.table_exists_in_schema(table_name) {
return Ok(());
}
return Err(QualifyColumnsError::UnknownTable(table_name.clone()));
}

Expand Down Expand Up @@ -2467,6 +2526,143 @@ mod tests {
assert!(sql.contains("t.b"));
}

#[test]
fn test_qualify_columns_join_using() {
let expr = parse("SELECT a FROM t1 JOIN t2 USING(a)");

let mut schema = MappingSchema::new();
schema
.add_table(
"t1",
&[("a".to_string(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");
schema
.add_table(
"t2",
&[("a".to_string(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");

let result =
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
let sql = gen(&result);

// The USING column should be qualified with the left (FROM) table
assert!(sql.contains("t1.a"), "USING column should resolve to FROM table: {sql}");
}

#[test]
fn test_qualify_columns_join_using_multiple_columns() {
let expr = parse("SELECT a, b FROM t1 JOIN t2 USING(a, b)");

let mut schema = MappingSchema::new();
schema
.add_table(
"t1",
&[
("a".to_string(), DataType::BigInt { length: None }),
("b".to_string(), DataType::BigInt { length: None }),
],
None,
)
.expect("schema setup");
schema
.add_table(
"t2",
&[
("a".to_string(), DataType::BigInt { length: None }),
("b".to_string(), DataType::BigInt { length: None }),
],
None,
)
.expect("schema setup");

let result =
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
let sql = gen(&result);

assert!(sql.contains("t1.a"), "USING column 'a' should resolve: {sql}");
assert!(sql.contains("t1.b"), "USING column 'b' should resolve: {sql}");
}

#[test]
fn test_qualify_columns_qualified_table_name() {
let expr = parse("SELECT a FROM raw.t1");

let mut schema = MappingSchema::new();
schema
.add_table(
"raw.t1",
&[("a".to_string(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");

let result =
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
let sql = gen(&result);

assert!(
sql.contains("t1.a"),
"column should be qualified with table name: {sql}"
);
}

#[test]
fn test_qualify_columns_correlated_scalar_subquery() {
let expr =
parse("SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1");

let mut schema = MappingSchema::new();
schema
.add_table(
"t1",
&[("id".to_string(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");
schema
.add_table(
"t2",
&[
("id".to_string(), DataType::BigInt { length: None }),
("val".to_string(), DataType::BigInt { length: None }),
],
None,
)
.expect("schema setup");

let result =
qualify_columns(expr, &schema, &QualifyColumnsOptions::new()).expect("qualify");
let sql = gen(&result);

assert!(sql.contains("t1.id"), "outer column should be qualified: {sql}");
assert!(sql.contains("t2.id"), "inner column should be qualified: {sql}");
}

#[test]
fn test_qualify_columns_rejects_unknown_table() {
let expr = parse("SELECT id FROM t1 WHERE nonexistent.col = 1");

let mut schema = MappingSchema::new();
schema
.add_table(
"t1",
&[("id".to_string(), DataType::BigInt { length: None })],
None,
)
.expect("schema setup");

let result = qualify_columns(expr, &schema, &QualifyColumnsOptions::new());
assert!(
result.is_err(),
"should reject reference to table not in scope or schema"
);
}

// ======================================================================
// quote_identifiers tests
// ======================================================================
Expand Down
Loading
Loading