Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def _parse_audit_log_row(
upstreams = []
column_usage = {}

has_empty_column = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to introduce a new flag? Why don't we return ObservedQuery directly from the loop, instead of having break-related logic?

for obj in direct_objects_accessed:
dataset = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
Expand All @@ -626,13 +627,32 @@ def _parse_audit_log_row(

columns = set()
for modified_column in obj["columns"]:
columns.add(
self.identifiers.snowflake_identifier(modified_column["columnName"])
)
column_name = modified_column["columnName"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make it very much visible that we decided to parse the query, for which we would otherwise use info coming directly from audit log, this is for 2 reasons:

  1. We want to understand why Snowflake would have such, from our perspective, malformed audit log. It would be the best to be able to pinpoint also the query involved.
  2. Parsing queries take much longer than just copying information from the audit log. This change has potential adverse effects for overall ingestion performance. We need to be aware how many queries had to be parsed by us.

So to meet above conditions we need to:

  1. Extend report object for Snowflake source, so that we can keep count of queries. Maybe saving query_id for each query which was forced to be parsed would be a good idea - use LossyList to not store too many. Such query_id could be used to retrieve actual query from the warehouse.
  2. We need to print information that this happened. I think at least info level should be used, maybe even warning. It is an open question whether we should go as far as using self.report.warning - in such case this message would appear in the Managed Ingestion UI, maybe that would be an overkill. WDYT?

if not column_name or not column_name.strip():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing also the case of non-empty, but containing only white-spaces column names!

has_empty_column = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add some comment explaining why are we deciding to parse the query ourselves in cases where there are empty column names.

break
columns.add(self.identifiers.snowflake_identifier(column_name))

if has_empty_column:
break

upstreams.append(dataset)
column_usage[dataset] = columns

if has_empty_column:
return ObservedQuery(
query=query_text,
session_id=res["session_id"],
timestamp=timestamp,
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
query_text, self.identifiers.platform, fast=True
),
extra_info=extra_info,
)

downstream = None
column_lineage = None
for obj in objects_modified:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,22 @@ def get_subjects(
query_subject_urns.add(upstream)
if include_fields:
for column in sorted(self.column_usage.get(upstream, [])):
# Skip empty column names to avoid creating invalid URNs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to print a message here, either warning or info. Same as below.

if not column or not column.strip():
continue
query_subject_urns.add(
builder.make_schema_field_urn(upstream, column)
)
if downstream_urn:
query_subject_urns.add(downstream_urn)
if include_fields:
for column_lineage in self.column_lineage:
# Skip empty downstream columns to avoid creating invalid URNs
if (
not column_lineage.downstream.column
or not column_lineage.downstream.column.strip()
):
continue
query_subject_urns.add(
builder.make_schema_field_urn(
downstream_urn, column_lineage.downstream.column
Expand Down
178 changes: 178 additions & 0 deletions metadata-ingestion/tests/unit/snowflake/test_snowflake_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,184 @@ def test_report_counts_with_disabled_features(self):
assert extractor.report.sql_aggregator.num_preparsed_queries == 0


class TestSnowflakeQueryParser:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is awesome! Maybe the comments should be more clear that we are testing the case where Snowflake sends as somehow corrupted results.
Also - why imports are done in the functions? Can't we move them top?

"""Tests for the SnowflakeQueriesExtractor._parse_query method."""

def test_parse_query_with_empty_column_name_returns_observed_query(self):
"""Test that queries with empty column names in direct_objects_accessed return ObservedQuery."""
from datetime import datetime, timezone

from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
)
from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery

mock_connection = Mock()
config = SnowflakeQueriesExtractorConfig(
window=BaseTimeWindowConfig(
start_time=datetime(2021, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2021, 1, 2, tzinfo=timezone.utc),
),
)
mock_report = Mock()
mock_filters = Mock()
mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder)
mock_identifiers.platform = "snowflake"
mock_identifiers.identifier_config = SnowflakeIdentifierConfig()
mock_identifiers.gen_dataset_urn = Mock(
return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)"
)
mock_identifiers.get_dataset_identifier_from_qualified_name = Mock(
return_value="test_db.test_schema.test_table"
)
mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x)
mock_identifiers.get_user_identifier = Mock(return_value="test_user")

extractor = SnowflakeQueriesExtractor(
connection=mock_connection,
config=config,
structured_report=mock_report,
filters=mock_filters,
identifiers=mock_identifiers,
)

# Simulate a Snowflake access history row with empty column name
import json

row = {
"QUERY_ID": "test_query_123",
"ROOT_QUERY_ID": None,
"QUERY_TEXT": "SELECT * FROM test_table WHERE id = 1",
"QUERY_TYPE": "SELECT",
"SESSION_ID": "session_123",
"USER_NAME": "test_user",
"ROLE_NAME": "test_role",
"QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
"END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
"QUERY_DURATION": 1000,
"ROWS_INSERTED": 0,
"ROWS_UPDATED": 0,
"ROWS_DELETED": 0,
"DEFAULT_DB": "test_db",
"DEFAULT_SCHEMA": "test_schema",
"QUERY_COUNT": 1,
"QUERY_SECONDARY_FINGERPRINT": "fingerprint_123",
"DIRECT_OBJECTS_ACCESSED": json.dumps(
[
{
"objectName": "test_db.test_schema.test_table",
"objectDomain": "Table",
"columns": [
{"columnName": "id"},
{"columnName": ""}, # Empty column name
{"columnName": "name"},
],
}
]
),
"OBJECTS_MODIFIED": json.dumps([]),
"OBJECT_MODIFIED_BY_DDL": None,
}

users: dict = {}

result = extractor._parse_audit_log_row(row, users)

# Assert that an ObservedQuery is returned when there's an empty column
assert isinstance(result, ObservedQuery), (
f"Expected ObservedQuery but got {type(result)}"
)
assert result.query == "SELECT * FROM test_table WHERE id = 1"
assert result.session_id == "session_123"
assert result.default_db == "test_db"
assert result.default_schema == "test_schema"

def test_parse_query_with_valid_columns_returns_preparsed_query(self):
"""Test that queries with all valid column names return PreparsedQuery."""
from datetime import datetime, timezone

from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
)
from datahub.sql_parsing.sql_parsing_aggregator import PreparsedQuery

mock_connection = Mock()
config = SnowflakeQueriesExtractorConfig(
window=BaseTimeWindowConfig(
start_time=datetime(2021, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2021, 1, 2, tzinfo=timezone.utc),
),
)
mock_report = Mock()
mock_filters = Mock()
mock_identifiers = Mock(spec=SnowflakeIdentifierBuilder)
mock_identifiers.platform = "snowflake"
mock_identifiers.identifier_config = SnowflakeIdentifierConfig()
mock_identifiers.gen_dataset_urn = Mock(
return_value="urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.test_table,PROD)"
)
mock_identifiers.get_dataset_identifier_from_qualified_name = Mock(
return_value="test_db.test_schema.test_table"
)
mock_identifiers.snowflake_identifier = Mock(side_effect=lambda x: x)
mock_identifiers.get_user_identifier = Mock(return_value="test_user")

extractor = SnowflakeQueriesExtractor(
connection=mock_connection,
config=config,
structured_report=mock_report,
filters=mock_filters,
identifiers=mock_identifiers,
)

# Simulate a Snowflake access history row with valid column names
import json

row = {
"QUERY_ID": "test_query_456",
"ROOT_QUERY_ID": None,
"QUERY_TEXT": "SELECT id, name FROM test_table",
"QUERY_TYPE": "SELECT",
"SESSION_ID": "session_456",
"USER_NAME": "test_user",
"ROLE_NAME": "test_role",
"QUERY_START_TIME": datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
"END_TIME": datetime(2021, 1, 1, 12, 0, 1, tzinfo=timezone.utc),
"QUERY_DURATION": 1000,
"ROWS_INSERTED": 0,
"ROWS_UPDATED": 0,
"ROWS_DELETED": 0,
"DEFAULT_DB": "test_db",
"DEFAULT_SCHEMA": "test_schema",
"QUERY_COUNT": 1,
"QUERY_SECONDARY_FINGERPRINT": "fingerprint_456",
"DIRECT_OBJECTS_ACCESSED": json.dumps(
[
{
"objectName": "test_db.test_schema.test_table",
"objectDomain": "Table",
"columns": [
{"columnName": "id"},
{"columnName": "name"},
],
}
]
),
"OBJECTS_MODIFIED": json.dumps([]),
"OBJECT_MODIFIED_BY_DDL": None,
}

users: dict = {}

result = extractor._parse_audit_log_row(row, users)

# Assert that a PreparsedQuery is returned when all columns are valid
assert isinstance(result, PreparsedQuery), (
f"Expected PreparsedQuery but got {type(result)}"
)
assert result.query_text == "SELECT id, name FROM test_table"


class TestSnowflakeQueriesExtractorStatefulTimeWindowIngestion:
"""Tests for stateful time window ingestion support in queries v2."""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
[
{
"entityType": "query",
"entityUrn": "urn:li:query:test-delete-query",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "DELETE FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart AS DBT_INTERNAL_DEST\nWHERE\n (\n unique_key_input\n ) IN (\n SELECT DISTINCT\n unique_key_input\n FROM PRODUCTION.DCA_CORE.snowplow_user_engagement_mart__dbt_tmp AS DBT_INTERNAL_SOURCE\n )",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:test-delete-query",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart__dbt_tmp,PROD),unique_key_input)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dca_core.snowplow_user_engagement_mart,PROD)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:test-delete-query",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
[
{
"entityType": "query",
"entityUrn": "urn:li:query:test-select-gsheets-view",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"customProperties": {},
"statement": {
"value": "SELECT\n *\nFROM production.dsd_digital_private.gsheets_legacy_views\nWHERE\n id = 123",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:test-select-gsheets-view",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),id)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,production.dsd_digital_private.gsheets_legacy_views,PROD),name)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:test-select-gsheets-view",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake"
}
}
}
]
Loading
Loading