From ecd54eb6f2f19d37ea38baecb4c4de9e1470fbb8 Mon Sep 17 00:00:00 2001 From: Kyungsoo Lee Date: Fri, 24 Oct 2025 14:25:23 -0700 Subject: [PATCH] fix(ingest): Handle empty column names from Snowflake access history Snowflake's access history can return empty column names for certain query types (e.g., DELETE, queries on views over external sources like Google Sheets). This was causing invalid schemaField URNs to be sent to GMS. This fix adds two layers of protection: 1. At ingestion source level: Detect empty columns in direct_objects_accessed and fall back to ObservedQuery for DataHub's own SQL parsing 2. At query subjects generation: Skip empty column names when creating schemaField URNs to prevent invalid URN generation --- .../source/snowflake/snowflake_queries.py | 26 ++- .../sql_parsing/sql_parsing_aggregator.py | 9 + .../unit/snowflake/test_snowflake_queries.py | 178 ++++++++++++++++++ ...empty_column_in_query_subjects_golden.json | 58 ++++++ ...ery_subjects_only_column_usage_golden.json | 58 ++++++ .../unit/sql_parsing/test_sql_aggregator.py | 142 ++++++++++++++ 6 files changed, 468 insertions(+), 3 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index fceb76166bb7eb..6c1506cc0651dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -616,6 +616,7 @@ def _parse_audit_log_row( upstreams = [] column_usage = {} + has_empty_column = False for obj in direct_objects_accessed: dataset = self.identifiers.gen_dataset_urn( self.identifiers.get_dataset_identifier_from_qualified_name( @@ -625,13 +626,32 @@ def _parse_audit_log_row( columns = set() for modified_column in obj["columns"]: - columns.add( - self.identifiers.snowflake_identifier(modified_column["columnName"]) - ) + column_name = modified_column["columnName"] + if not column_name or not column_name.strip(): + has_empty_column = True + break + columns.add(self.identifiers.snowflake_identifier(column_name)) + + if has_empty_column: + break upstreams.append(dataset) column_usage[dataset] = columns + if has_empty_column: + return ObservedQuery( + query=query_text, + session_id=res["session_id"], + timestamp=timestamp, + user=user, + default_db=res["default_db"], + default_schema=res["default_schema"], + query_hash=get_query_fingerprint( + query_text, self.identifiers.platform, fast=True + ), + extra_info=extra_info, + ) + downstream = None column_lineage = None for obj in objects_modified: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index a093ce4596a119..ecf49db0ca5e3c 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -168,6 +168,9 @@ def get_subjects( query_subject_urns.add(upstream) if include_fields: for column in sorted(self.column_usage.get(upstream, [])): + # Skip empty column names to avoid creating invalid URNs + if not column or not column.strip(): + continue query_subject_urns.add( builder.make_schema_field_urn(upstream, column) ) @@ -175,6 +178,12 @@ def get_subjects( query_subject_urns.add(downstream_urn) if include_fields: for column_lineage in self.column_lineage: + # Skip empty downstream columns to avoid creating invalid URNs + if ( + not column_lineage.downstream.column + or not column_lineage.downstream.column.strip() + ): + continue query_subject_urns.add( builder.make_schema_field_urn( downstream_urn, column_lineage.downstream.column diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py index 010de18f1dacfa..bd38eb2e6ee1b9 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py @@ -605,6 +605,184 @@ def test_report_counts_with_disabled_features(self): assert extractor.report.sql_aggregator.num_preparsed_queries == 0 +class TestSnowflakeQueryParser: + """Tests for the SnowflakeQueriesExtractor._parse_query method.""" + + def test_parse_query_with_empty_column_name_returns_observed_query(self): + """Test that queries with empty column names in direct_objects_accessed return ObservedQuery.""" + from datetime import datetime, timezone + + from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeIdentifierBuilder, + ) + from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery + + mock_connection = Mock() + config = SnowflakeQueriesExtractorConfig( + window=BaseTimeWindowConfig( + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), + ), + ) + mock_report = Mock() + mock_filters = Mock() + mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) + mock_identifiers.platform = "snowflake" + mock_identifiers.identifier_config = SnowflakeIdentifierConfig() + mock_identifiers.gen_dataset_urn = Mock( + return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" + ) + mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( + return_value="test_db.test_schema.test_table" + ) + mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) + mock_identifiers.get_user_identifier = Mock(return_value="test_user") + + extractor = SnowflakeQueriesExtractor( + connection=mock_connection, + config=config, + structured_report=mock_report, + filters=mock_filters, + identifiers=mock_identifiers, + ) + + # Simulate a Snowflake access history row with empty column name + import json + + row = { + "QUERY_ID": "test_query_123", + "ROOT_QUERY_ID": None, + "QUERY_TEXT": "SELECT * FROM test_table WHERE id = 1", + "QUERY_TYPE": "SELECT", + "SESSION_ID": "session_123", + "USER_NAME": "test_user", + "ROLE_NAME": "test_role", + "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), + "QUERY_DURATION": 1000, + "ROWS_INSERTED": 0, + "ROWS_UPDATED": 0, + "ROWS_DELETED": 0, + "DEFAULT_DB": "test_db", + "DEFAULT_SCHEMA": "test_schema", + "QUERY_COUNT": 1, + "QUERY_SECONDARY_FINGERPRINT": "fingerprint_123", + "DIRECT_OBJECTS_ACCESSED": json.dumps( + [ + { + "objectName": "test_db.test_schema.test_table", + "objectDomain": "Table", + "columns": [ + {"columnName": "id"}, + {"columnName": ""}, # Empty column name + {"columnName": "name"}, + ], + } + ] + ), + "OBJECTS_MODIFIED": json.dumps([]), + "OBJECT_MODIFIED_BY_DDL": None, + } + + users: dict = {} + + result = extractor._parse_audit_log_row(row, users) + + # Assert that an ObservedQuery is returned when there's an empty column + assert isinstance(result, ObservedQuery), ( + f"Expected ObservedQuery but got {type(result)}" + ) + assert result.query == "SELECT * FROM test_table WHERE id = 1" + assert result.session_id == "session_123" + assert result.default_db == "test_db" + assert result.default_schema == "test_schema" + + def test_parse_query_with_valid_columns_returns_preparsed_query(self): + """Test that queries with all valid column names return PreparsedQuery.""" + from datetime import datetime, timezone + + from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeIdentifierBuilder, + ) + from datahub.sql_parsing.sql_parsing_aggregator import PreparsedQuery + + mock_connection = Mock() + config = SnowflakeQueriesExtractorConfig( + window=BaseTimeWindowConfig( + start_time=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_time=datetime(2021, 1, 2, tzinfo=timezone.utc), + ), + ) + mock_report = Mock() + mock_filters = Mock() + mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder) + mock_identifiers.platform = "snowflake" + mock_identifiers.identifier_config = SnowflakeIdentifierConfig() + mock_identifiers.gen_dataset_urn = Mock( + return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)" + ) + mock_identifiers.get_dataset_identifier_from_qualified_name = Mock( + return_value="test_db.test_schema.test_table" + ) + mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x) + mock_identifiers.get_user_identifier = Mock(return_value="test_user") + + extractor = SnowflakeQueriesExtractor( + connection=mock_connection, + config=config, + structured_report=mock_report, + filters=mock_filters, + identifiers=mock_identifiers, + ) + + # Simulate a Snowflake access history row with valid column names + import json + + row = { + "QUERY_ID": "test_query_456", + "ROOT_QUERY_ID": None, + "QUERY_TEXT": "SELECT id, name FROM test_table", + "QUERY_TYPE": "SELECT", + "SESSION_ID": "session_456", + "USER_NAME": "test_user", + "ROLE_NAME": "test_role", + "QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + "END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc), + "QUERY_DURATION": 1000, + "ROWS_INSERTED": 0, + "ROWS_UPDATED": 0, + "ROWS_DELETED": 0, + "DEFAULT_DB": "test_db", + "DEFAULT_SCHEMA": "test_schema", + "QUERY_COUNT": 1, + "QUERY_SECONDARY_FINGERPRINT": "fingerprint_456", + "DIRECT_OBJECTS_ACCESSED": json.dumps( + [ + { + "objectName": "test_db.test_schema.test_table", + "objectDomain": "Table", + "columns": [ + {"columnName": "id"}, + {"columnName": "name"}, + ], + } + ] + ), + "OBJECTS_MODIFIED": json.dumps([]), + "OBJECT_MODIFIED_BY_DDL": None, + } + + users: dict = {} + + result = extractor._parse_audit_log_row(row, users) + + # Assert that a PreparsedQuery is returned when all columns are valid + assert isinstance(result, PreparsedQuery), ( + f"Expected PreparsedQuery but got {type(result)}" + ) + assert result.query_text == "SELECT id, name FROM test_table" + + class TestSnowflakeQueriesExtractorStatefulTimeWindowIngestion: """Tests for stateful time window ingestion support in queries v2.""" diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json new file mode 100644 index 00000000000000..78f01d2561030c --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_golden.json @@ -0,0 +1,58 @@ +[ +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "DELETE FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart AS DBT_INTERNAL_DEST\nWHERE\n (\n unique_key_input\n ) IN (\n SELECT DISTINCT\n unique_key_input\n FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp AS DBT_INTERNAL_SOURCE\n )", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD),unique_key_input)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-delete-query", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json new file mode 100644 index 00000000000000..ca904a32b0725e --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_empty_column_in_query_subjects_only_column_usage_golden.json @@ -0,0 +1,58 @@ +[ +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "customProperties": {}, + "statement": { + "value": "SELECT\n *\nFROM production.dsd_digital_private.gsheets_legacy_views\nWHERE\n id = 123", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),id)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),name)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:test-select-gsheets-view", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 98ed52c6c30903..b4ca2630b02677 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -1297,3 +1297,145 @@ def test_partial_empty_downstream_column_in_snowflake_lineage( RESOURCE_DIR / "test_partial_empty_downstream_column_in_snowflake_lineage_golden.json", ) + + +@freeze_time(FROZEN_TIME) +def test_empty_column_in_query_subjects( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that QuerySubjects with empty column names doesn't create invalid URNs. + + This simulates the Snowflake scenario where DELETE queries return empty column + arrays in access_history, which should not result in invalid schemaField URNs. + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + generate_queries=True, + generate_query_subject_fields=True, + ) + + downstream_urn = DatasetUrn( + "snowflake", "production.dca_core.snowplow_user_engagement_mart" + ).urn() + upstream_urn = DatasetUrn( + "snowflake", "production.dca_core.snowplow_user_engagement_mart__dbt_tmp" + ).urn() + + # Simulate a DELETE query with subquery where Snowflake returns empty columns + preparsed_query = PreparsedQuery( + query_id="test-delete-query", + query_text=( + "delete from PRODUCTION.DCA_CORE.snowplow_user_engagement_mart " + "as DBT_INTERNAL_DEST where (unique_key_input) in (" + "select distinct unique_key_input from " + "PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp " + "as DBT_INTERNAL_SOURCE)" + ), + upstreams=[upstream_urn], + downstream=downstream_urn, + column_lineage=[ + # Snowflake returns empty column names for DELETE operations + ColumnLineageInfo( + downstream=DownstreamColumnRef(table=downstream_urn, column=""), + upstreams=[ColumnRef(table=upstream_urn, column="unique_key_input")], + ), + ], + column_usage={ + upstream_urn: {"unique_key_input", ""}, # Empty column from Snowflake + }, + query_type=QueryType.DELETE, + timestamp=_ts(20), + ) + + aggregator.add_preparsed_query(preparsed_query) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + query_mcpws = [ + mcpw + for mcpw in mcpws + if mcpw.entityUrn and mcpw.entityUrn.startswith("urn:li:query:") + ] + + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, query_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR / "test_empty_column_in_query_subjects_golden.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_empty_column_in_query_subjects_only_column_usage( + pytestconfig: pytest.Config, tmp_path: pathlib.Path +) -> None: + """Test that QuerySubjects with empty columns ONLY in column_usage doesn't create invalid URNs. + + This simulates the exact customer scenario where: + - Snowflake returns empty columns in direct_objects_accessed (column_usage) + - But NO empty columns in objects_modified (column_lineage is empty or valid) + + This is the scenario that would send invalid URNs to GMS rather than crash in Python, + matching the customer's error: "Provided urn urn:li:schemaField:(...,) is invalid" + + Example: SELECT queries on views over Google Sheets or other special data sources + where column tracking fails for reads but there's no downstream table. + """ + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + generate_queries=True, + generate_query_subject_fields=True, + generate_query_usage_statistics=True, + usage_config=BaseUsageConfig( + bucket_duration=BucketDuration.DAY, + start_time=parse_user_datetime("2024-02-06T00:00:00Z"), + end_time=parse_user_datetime("2024-02-07T00:00:00Z"), + ), + ) + + # Simulate table name from customer: production.dsd_digital_private.gsheets_legacy_views + upstream_urn = DatasetUrn( + "snowflake", "production.dsd_digital_private.gsheets_legacy_views" + ).urn() + + # Simulate a SELECT query (no downstream) where Snowflake has empty column tracking + # This is common with views over external data sources like Google Sheets + preparsed_query = PreparsedQuery( + query_id="test-select-gsheets-view", + query_text="SELECT * FROM production.dsd_digital_private.gsheets_legacy_views WHERE id = 123", + upstreams=[upstream_urn], + downstream=None, # SELECT query has no downstream + column_lineage=[], # No column lineage because no downstream + column_usage={ + # Snowflake returns empty column names for problematic views + upstream_urn: {"id", "name", ""}, # Empty column from Snowflake! + }, + query_type=QueryType.SELECT, + timestamp=_ts(20), + ) + + aggregator.add_preparsed_query(preparsed_query) + + mcpws = [mcp for mcp in aggregator.gen_metadata()] + query_mcpws = [ + mcpw + for mcpw in mcpws + if mcpw.entityUrn and mcpw.entityUrn.startswith("urn:li:query:") + ] + + out_path = tmp_path / "mcpw.json" + write_metadata_file(out_path, query_mcpws) + + mce_helpers.check_golden_file( + pytestconfig, + out_path, + RESOURCE_DIR + / "test_empty_column_in_query_subjects_only_column_usage_golden.json", + )