Skip to content

fix: yield partitions for unique stream slices in StreamSlicerPartitionGenerator #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2025

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented Apr 26, 2025

Some parent streams may return duplicate record IDs in the response. As a result, we can end up with several identical partitions. After one partition is processed, the next one causes an error because it is already closed.

For example, in the source-stripe connector, the payout_balance_transactions stream depends on balance_transactions, which uses the events endpoint to fetch data incrementally. This can lead to multiple events for the same parent with the same state but different values in other fields.

Summary by CodeRabbit

  • Bug Fixes
    • Improved partition generation to eliminate duplicate partitions when identical stream slices are encountered.

@github-actions github-actions bot added bug Something isn't working security labels Apr 26, 2025
@lazebnyi lazebnyi requested review from maxi297 and brianjlai April 26, 2025 00:54
@lazebnyi
Copy link
Contributor Author

lazebnyi commented Apr 26, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

coderabbitai bot commented Apr 26, 2025

📝 Walkthrough

Walkthrough

The StreamSlicerPartitionGenerator class in the Airbyte CDK was updated to prevent yielding duplicate partitions for identical stream slices. This was accomplished by introducing a mechanism to track previously seen stream slices using a hashable representation. A new static method, _make_hashable, was added to recursively convert stream slices into hashable types, enabling efficient duplicate detection. The generate method was modified to use this mechanism, ensuring only unique partitions are yielded. Additionally, the import statements were updated to include the required Hashable type.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py Added _make_hashable static method and updated generate to filter duplicate stream slices using a hashable set.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant StreamSlicerPartitionGenerator

    Caller->>StreamSlicerPartitionGenerator: generate()
    StreamSlicerPartitionGenerator->>StreamSlicerPartitionGenerator: For each stream_slice
    StreamSlicerPartitionGenerator->>StreamSlicerPartitionGenerator: _make_hashable(stream_slice)
    alt stream_slice not seen
        StreamSlicerPartitionGenerator->>StreamSlicerPartitionGenerator: Add to seen_slices
        StreamSlicerPartitionGenerator-->>Caller: yield partition
    else stream_slice seen
        StreamSlicerPartitionGenerator-->>StreamSlicerPartitionGenerator: Skip
    end
Loading

Suggested labels

bug

Would you like to add a test to ensure the duplicate filtering logic remains robust in future changes, wdyt?


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 863ba57 and e6c0697.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

92-99: Excellent implementation to prevent duplicate partition processing

The approach to tracking unique stream slices using a set and hashable representations is clean and effective. This directly addresses the PR objective of preventing errors when processing duplicate partitions.

A small suggestion - would it be valuable to log when a duplicate slice is skipped, perhaps at debug level? This could help with troubleshooting if needed. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4bd6048 and 04046f6.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

[error] 107-107: mypy error: Returning Any from function declared to return "Hashable" [no-any-return]

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

3-3: Import of Hashable type to support new functionality

The addition of Hashable from the typing module aligns perfectly with the new functionality to track unique stream slices. This enables proper type hinting for the hashable representations.

… github.com:airbytehq/airbyte-python-cdk into lazebnyi/generate-partiitons-only-for-unique-slices
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepting right now to unblock oncall issue but I think this will requires some tests

@maxi297
Copy link
Contributor

maxi297 commented Apr 26, 2025

Do we fear this set could increase in size and cause memory issues at some point? Else, I thought about grouping partition routing but I fear it would mess the state

@lazebnyi
Copy link
Contributor Author

Do we fear this set could increase in size and cause memory issues at some point? Else, I thought about grouping partition routing but I fear it would mess the state

In theory, yes, we could have memory issues since we store each partition. So, maybe we should introduce a flag to avoid using duplicate validation in cases where we are sure that parent records are unique, or even consider switching off concurrency for them.

Alternatively, we could use a key-value storage for these cases, especially if it’s a tier0 or tier1 user.

@lazebnyi lazebnyi merged commit d665ca0 into main Apr 26, 2025
27 checks passed
@lazebnyi lazebnyi deleted the lazebnyi/generate-partiitons-only-for-unique-slices branch April 26, 2025 11:02
@maxi297
Copy link
Contributor

maxi297 commented Apr 26, 2025

Will be reverted as part of #510 in favor of #509

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working security
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants