-
Couldn't load subscription status.
- Fork 3.2k
fix(ingest): Handle empty column names from Snowflake access history #15106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Snowflake's access history can return empty column names for certain query types (e.g., DELETE, queries on views over external sources like Google Sheets). This was causing invalid schemaField URNs to be sent to GMS. This fix adds two layers of protection: 1. At ingestion source level: Detect empty columns in direct_objects_accessed and fall back to ObservedQuery for DataHub's own SQL parsing 2. At query subjects generation: Skip empty column names when creating schemaField URNs to prevent invalid URN generation
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
|
✅ Meticulous spotted 0 visual differences across 1016 screens tested: view results. Meticulous evaluated ~8 hours of user flows against your PR. Expected differences? Click here. Last updated for commit ecd54eb. This comment will update as new commits are pushed. |
Bundle ReportBundle size has no change ✅ |
| upstreams = [] | ||
| column_usage = {} | ||
|
|
||
| has_empty_column = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to introduce a new flag? Why don't we return ObservedQuery directly from the loop, instead of having break-related logic?
| self.identifiers.snowflake_identifier(modified_column["columnName"]) | ||
| ) | ||
| column_name = modified_column["columnName"] | ||
| if not column_name or not column_name.strip(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for addressing also the case of non-empty, but containing only white-spaces column names!
| columns.add( | ||
| self.identifiers.snowflake_identifier(modified_column["columnName"]) | ||
| ) | ||
| column_name = modified_column["columnName"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make it very much visible that we decided to parse the query, for which we would otherwise use info coming directly from audit log, this is for 2 reasons:
- We want to understand why Snowflake would have such, from our perspective, malformed audit log. It would be the best to be able to pinpoint also the query involved.
- Parsing queries take much longer than just copying information from the audit log. This change has potential adverse effects for overall ingestion performance. We need to be aware how many queries had to be parsed by us.
So to meet above conditions we need to:
- Extend
reportobject for Snowflake source, so that we can keep count of queries. Maybe savingquery_idfor each query which was forced to be parsed would be a good idea - useLossyListto not store too many. Suchquery_idcould be used to retrieve actual query from the warehouse. - We need to print information that this happened. I think at least
infolevel should be used, maybe evenwarning. It is an open question whether we should go as far as usingself.report.warning- in such case this message would appear in the Managed Ingestion UI, maybe that would be an overkill. WDYT?
| query_subject_urns.add(upstream) | ||
| if include_fields: | ||
| for column in sorted(self.column_usage.get(upstream, [])): | ||
| # Skip empty column names to avoid creating invalid URNs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to print a message here, either warning or info. Same as below.
| ) | ||
| column_name = modified_column["columnName"] | ||
| if not column_name or not column_name.strip(): | ||
| has_empty_column = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also add some comment explaining why are we deciding to parse the query ourselves in cases where there are empty column names.
| assert extractor.report.sql_aggregator.num_preparsed_queries == 0 | ||
|
|
||
|
|
||
| class TestSnowflakeQueryParser: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is awesome! Maybe the comments should be more clear that we are testing the case where Snowflake sends as somehow corrupted results.
Also - why imports are done in the functions? Can't we move them top?
| ) -> None: | ||
| """Test that QuerySubjects with empty column names doesn't create invalid URNs. | ||
| This simulates the Snowflake scenario where DELETE queries return empty column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about DELETE queries.
| "snowflake", "production.dca_core.snowplow_user_engagement_mart__dbt_tmp" | ||
| ).urn() | ||
|
|
||
| # Simulate a DELETE query with subquery where Snowflake returns empty columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about DELETE queries.
| upstreams=[upstream_urn], | ||
| downstream=downstream_urn, | ||
| column_lineage=[ | ||
| # Snowflake returns empty column names for DELETE operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about DELETE queries.
(I mean only the comment, test code is great, please keep it!)
| This is the scenario that would send invalid URNs to GMS rather than crash in Python, | ||
| matching the customer's error: "Provided urn urn:li:schemaField:(...,) is invalid" | ||
| Example: SELECT queries on views over Google Sheets or other special data sources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about Google Sheets or other special sources.
| ).urn() | ||
|
|
||
| # Simulate a SELECT query (no downstream) where Snowflake has empty column tracking | ||
| # This is common with views over external data sources like Google Sheets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about Google Sheets or other special sources.
| downstream=None, # SELECT query has no downstream | ||
| column_lineage=[], # No column lineage because no downstream | ||
| column_usage={ | ||
| # Snowflake returns empty column names for problematic views |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overstatement - we haven't been able to identify queries or table types for which it happens, let's remove indication about Google Sheets or other special sources.
| ), | ||
| ) | ||
|
|
||
| # Simulate table name from customer: production.dsd_digital_private.gsheets_legacy_views |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # Simulate table name from customer: production.dsd_digital_private.gsheets_legacy_views | |
| # Simulate table name from user: production.dsd_digital_private.gsheets_legacy_views |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I greatly appreciate your meticulous approach to unit tests, which look exactly as proper unit tests should look like! I have left just a couple of comments.
Snowflake's access history can return empty column names for certain query types (e.g., DELETE, queries on views over external sources like Google Sheets). This was causing invalid schemaField URNs to be sent to GMS.
This fix adds two layers of protection: