Skip to content

Conversation

@treff7es
Copy link
Contributor

Summary

This PR implements unified stateful time window ingestion for BigQuery and Snowflake queries extractors (queries v2), replacing the previous lineage-specific approach with a more flexible time window-based handler. The new implementation supports bucket alignment for usage statistics aggregation and
works consistently across all queries v2 features (lineage, usage, operations, queries).

Changes

Feature Implementation

Unified State Handler:

  • Replaced RedundantLineageRunSkipHandler with RedundantQueriesRunSkipHandler
  • Handler is now always active when provided (not conditional on include_lineage)
  • State tracking now works for all queries v2 features, not just lineage

Bucket Alignment for Usage Statistics:

  • Added automatic start time alignment to bucket boundaries when include_usage_statistics=True
  • Supports both daily and hourly bucket durations via config.window.bucket_duration
  • Ensures complete bucket periods for accurate usage aggregation:
    • Daily buckets: Rounds start time down to 00:00:00 of the day
    • Hourly buckets: Rounds start time down to HH:00:00 of the hour
  • Alignment uses existing get_time_bucket() function in time_window_config.py

State Updates:

  • update_state() now includes bucket_duration parameter for proper state tracking
  • State is updated after successful extraction for all features

Files Modified:

  • src/datahub/ingestion/source/bigquery_v2/queries_extractor.py
  • src/datahub/ingestion/source/snowflake/snowflake_queries.py
  • src/datahub/ingestion/source/state/redundant_run_skip_handler.py

Test Coverage

Updated and enhanced test suites for both BigQuery and Snowflake:

Files Modified:

  • tests/unit/bigquery/test_bigquery_queries.py
  • tests/unit/snowflake/test_snowflake_queries.py

Test Updates:

  1. Renamed test classes from *StatefulLineageIngestion to *StatefulTimeWindowIngestion
  2. Updated imports to use RedundantQueriesRunSkipHandler
  3. Removed lineage-specific conditional tests (handler now always active)
  4. Updated update_state assertions to include BucketDuration parameter
  5. Added comprehensive bucket alignment tests:
    • test_bucket_alignment_with_usage_statistics - Daily bucket alignment
    • test_bucket_alignment_hourly_with_usage_statistics - Hourly bucket alignment
    • test_no_bucket_alignment_without_usage_statistics - No alignment when disabled

Bug Fix

Fixed mypy Duplicate Module Error:

  • Renamed tests/integration/bigquery_v2/test_bigquery_queries.py to test_bigquery_queries_integration.py
  • Resolves mypy duplicate module name conflict with unit test file

Benefits

  1. More Flexible State Management: State tracking now works for all queries v2 features, not just lineage
  2. Accurate Usage Aggregation: Bucket alignment ensures complete time periods for usage statistics
  3. Configurable Granularity: Supports both hourly and daily bucket durations
  4. Better Incremental Ingestion: Properly tracks time windows for all feature combinations
  5. Cleaner Architecture: Unified handler reduces complexity and conditional logic

Testing

Unit Tests:

  • ✅ All 65 tests pass (8 BigQuery + 57 Snowflake)
  • ✅ New bucket alignment tests verify both daily and hourly configurations
  • ✅ Tests cover handler presence/absence, time window adjustment, state updates

Linting:

  • ruff check passes
  • mypy passes (no duplicate module errors)

Test Execution:

pytest tests/unit/bigquery/test_bigquery_queries.py tests/unit/snowflake/test_snowflake_queries.py -v
# 65 passed in 2.07s

Usage Example

source:
  type: bigquery-queries
  config:
    include_usage_statistics: true
    window:
      start_time: "-7d"
      end_time: "2024-01-01"
      bucket_duration: DAY  # or HOUR for hourly granularity
    stateful_ingestion:
      enabled: true

When stateful ingestion is enabled with usage statistics:
- Start time will be automatically aligned to bucket boundaries
- State is preserved across runs for incremental ingestion
- Works consistently for all queries v2 features

Breaking Changes

None - this is an enhancement that maintains backward compatibility. Existing configurations will continue to work as before.

Related Issues

Addresses the need for unified state management across all queries v2 features and ensures accurate usage statistics aggregation through proper bucket alignment.

<!--

Thank you for contributing to DataHub!

Before you submit your PR, please go through the checklist below:

- [ ] The PR conforms to DataHub's [Contributing Guideline](https://github.com/datahub-project/datahub/blob/master/docs/CONTRIBUTING.md) (particularly [PR Title Format](https://github.com/datahub-project/datahub/blob/master/docs/CONTRIBUTING.md#pr-title-format))
- [ ] Links to related issues (if applicable)
- [ ] Tests for the changes have been added/updated (if applicable)
- [ ] Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
- [ ] For any breaking change/potential downtime/deprecation/big changes an entry has been made in [Updating DataHub](https://github.com/datahub-project/datahub/blob/master/docs/how/updating-datahub.md)

-->

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Oct 17, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Oct 17, 2025
@codecov
Copy link

codecov bot commented Oct 17, 2025

@sgomezvillamor
Copy link
Contributor

replacing the previous lineage-specific approach with a more flexible time window-based handler

this impacts user-facing config, right? A before/after example would help to understand what we are really addressing here

Comment on lines +541 to +542
"enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated "
"when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. "
"For queries v2, use enable_stateful_time_window instead to enable stateful ingestion "
"for the unified time window extraction (lineage + usage + operations + queries)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False

Comment on lines +489 to +492
"enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated "
"when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. "
"For queries v2, use enable_stateful_time_window instead to enable stateful ingestion "
"for the unified time window extraction (lineage + usage + operations + queries)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Oct 21, 2025
self.structured_report = structured_report
self.redundant_run_skip_handler = redundant_run_skip_handler

self.start_time, self.end_time = self._get_time_window()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are not tracking them already, we should log or add to the report these effective start/end time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We log it

Comment on lines +534 to +533
@root_validator(pre=False, skip_on_failure=True)
def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm replacing these v1 validators in #15057

Comment on lines +482 to +483
@root_validator(pre=False, skip_on_failure=True)
def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict:
Copy link
Contributor

@sgomezvillamor sgomezvillamor Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm replacing these v1 validators in #15057

Copy link
Contributor

@sgomezvillamor sgomezvillamor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Oct 21, 2025
@treff7es
Copy link
Contributor Author

Documentation updates completed:

Changes made:

Updated field documentation in stateful_ingestion_base.py for:

  1. enable_stateful_lineage_ingestion (line 100-107)
  2. enable_stateful_usage_ingestion (line 151-158)

Both fields now include the following note:

NOTE: This only works with use_queries_v2=False (legacy extraction path). For queries v2, use enable_stateful_time_window instead.

These changes automatically apply to both BigQuery and Snowflake configs since they inherit from these base mixins (StatefulLineageConfigMixin and StatefulUsageConfigMixin).

@treff7es treff7es merged commit e9becdd into master Oct 22, 2025
90 of 107 checks passed
@treff7es treff7es deleted the window_state branch October 22, 2025 09:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants