Skip to content

poc: migrate to concurrent cursors in declarative package #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Apr 11, 2025

WIP

Summary by CodeRabbit

  • New Features

    • Enhanced concurrent stream processing with improved partition management and state tracking for more efficient incremental data synchronization.
    • Added detailed logging during stream iteration to boost operational traceability.
  • Refactor

    • Streamlined incremental synchronization workflows and refined cursor validation, resulting in clearer error feedback and more robust configuration handling.
    • Optimized message processing to better regulate state updates.

@maxi297 maxi297 marked this pull request as draft April 11, 2025 06:14
Copy link
Contributor

coderabbitai bot commented Apr 11, 2025

📝 Walkthrough

Walkthrough

This update refines the stream handling and cursor management in the Airbyte CDK. The changes include updating type checks to use ConcurrentPerPartitionCursor, enhancing state management with new methods and logging, adjusting incremental synchronization logic, and introducing a new partition iteration function. Minor formatting and test adjustments were also applied. All changes preserve the overall flow while improving the clarity and efficiency of the concurrent and incremental stream processing.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py,
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py,
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py,
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Updated cursor handling for incremental sync: replaced checks for PerPartitionWithGlobalCursor/GlobalSubstreamCursor with ConcurrentPerPartitionCursor, added new state management methods, extra model imports, and logging for tracing slices and parent state changes.
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py Introduced a new function iterate_with_last_flag to yield stream slices with a flag and added a new InMemoryPartition class with helper methods (including stream naming, record reading, slicing, and hashing), plus updated parent stream state retrieval.
airbyte_cdk/sources/declarative/retrievers/retriever.py Modified import statements by consolidating StreamSlice and StreamState imports to a single module, simplifying the dependency structure.
airbyte_cdk/sources/message/repository.py In consume_queue, introduced an intermediate variable before yielding items, and in emit_message, added a check to skip emitting state messages.
airbyte_cdk/sources/streams/concurrent/cursor.py Minor formatting adjustment: added a blank line after the method docstring in stream_slices.
Unit Tests (multiple files):
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py,
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py,
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
Updated assertions and test configurations to reflect the new cursor types and state handling; added logging and updated expected state formats; removed methods (close, is_closed) related to partition lifecycle management.

Sequence Diagram(s)

sequenceDiagram
    participant SPR as SubstreamPartitionRouter
    participant ILF as iterate_with_last_flag
    participant IMP as InMemoryPartition
    participant PS as ParentState

    SPR->>ILF: Call iterate_with_last_flag(stream_slices)
    ILF-->>SPR: Yield (stream_slice, is_last)
    alt if is_last == false
        SPR->>IMP: Process stream slice normally
    else
        SPR->>IMP: Process final slice and update PS from stream_slicer.state
    end
Loading

Possibly related PRs

  • airbytehq/airbyte-python-cdk#111: Introduces and enhances the ConcurrentPerPartitionCursor, directly relating to this update’s cursor type modifications. wdyt?
  • airbytehq/airbyte-python-cdk#343: Enhances state management in ConcurrentPerPartitionCursor with new parameters and methods, aligning with the improvements in this PR. wdyt?
  • airbytehq/airbyte-python-cdk#228: Updates cursor handling and state management for concurrent processing, connecting closely with the type-check update for stream_slicer. wdyt?

Suggested labels

enhancement

Suggested reviewers

  • aaronsteers (Would you like to loop in aaronsteers for their insights on these changes, wdyt?)

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)

42-44: Added informative logging for slice iteration.

This logging statement will be helpful for debugging and monitoring the data flow during slice iteration. It logs both the current slice and its associated state, which provides valuable context for troubleshooting.

Perhaps consider using debug level for this log since it might be quite verbose in production? Something like:

-        logger.info(f"slice: {current}, state: {state}")
+        logger.debug(f"slice: {current}, state: {state}")

What do you think?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

airbyte_cdk/sources/message/repository.py (1)

95-96: Unnecessary intermediate variable introduced. Could we simplify?

You introduced an intermediate variable x where directly yielding would be simpler. This doesn't affect functionality but slightly affects readability.

-            x = self._message_queue.popleft()
-            yield x
+            yield self._message_queue.popleft()

What do you think? Is there a specific reason for the intermediate variable?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

678-678: Helpful comment about inconsistent state format

The comment documents an important issue with the test data having inconsistent formats between partitions.

Consider using more professional language in the comment for clarity and code quality:

-                        "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},  # that's a very fucked up state as it has different format for different partition
+                        "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},  # inconsistent state format - timestamp versus ISO string in different partitions

What do you think?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py


843-845: Temporary debug logging in test

This adds logging that might be useful during development but is very verbose for normal test runs.

Consider either:

  1. Using a more descriptive message and DEBUG level instead of WARNING
  2. Removing if no longer needed for debugging
  3. Adding conditional logging behind a flag/environment variable
-            import logging
-            logger = logging.getLogger()
-            logger.warning(f"Running tmp state: {state.stream.stream_state.__dict__}")
+            # If debugging is needed:
+            # import logging
+            # logger = logging.getLogger()
+            # logger.debug(f"Processing state: {state.stream.stream_state.__dict__}")

What do you think? Is this still needed?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

584-584: FIXME: removing _disable_resumable_full_refresh?
You left a note pondering removal. Do you want to remove or keep it for a future refactor? wdyt?


1398-1398: Potential parameters propagation issue
The code has a # FIXME about unpropagated parameters. Would you like to address this now or handle in a separate task? wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

2880-2891: Implemented ConcurrentCursor creation with proper dependencies.

This is a significant addition that properly creates a ConcurrentCursor with all required parameters including mocked state management and message repository. Would it be helpful to extract this cursor creation to a helper method if it's used in multiple tests, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)

23-24: Potentially unused imports.
It looks like DefaultStream is imported here but not referenced anywhere in this file. Would you consider removing it to keep the imports clean, wdyt?


255-266: Check for naming consistency and potential reusability.
This block extracts and merges additional pointer-based fields into the slice. The approach looks mostly fine. However, the naming _extract_child_response can be confusing. Would you consider something like _build_child_response for clarity, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e82436 and 9468546.

📒 Files selected for processing (11)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4 hunks)
  • airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (3 hunks)
  • airbyte_cdk/sources/declarative/retrievers/retriever.py (1 hunks)
  • airbyte_cdk/sources/message/repository.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/cursor.py (1 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (17 hunks)
  • unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py (0 hunks)
💤 Files with no reviewable changes (1)
  • unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py
🧰 Additional context used
🧬 Code Graph Analysis (6)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (3)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
  • logger (468-469)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (1)
  • logger (175-176)
airbyte_cdk/sources/declarative/retrievers/retriever.py (2)
  • state (37-48)
  • state (52-53)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • ConcurrentPerPartitionCursor (53-517)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (6)
unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py (1)
  • test_incremental_parent_state_no_incremental_dependency (1490-1533)
airbyte_cdk/sources/declarative/retrievers/retriever.py (2)
  • state (37-48)
  • state (52-53)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • state (129-149)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • state (53-53)
  • state (104-105)
  • state (171-174)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
  • state (610-611)
  • state (614-617)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (4)
  • state (90-91)
  • state (94-95)
  • state (212-214)
  • state (217-218)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (53-517)
  • cursor_field (125-126)
airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
  • SinglePartitionRouter (13-63)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • ListPartitionRouter (1805-1824)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (5)
airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
  • DefaultStream (20-99)
airbyte_cdk/sources/types.py (7)
  • partition (90-95)
  • Record (20-63)
  • StreamSlice (66-160)
  • data (34-35)
  • get (137-138)
  • extra_fields (106-108)
  • cursor_slice (98-103)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py (1)
  • stream_slices (85-91)
airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py (1)
  • logger (175-176)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/models/airbyte_protocol.py (3)
  • AirbyteStateBlob (17-52)
  • AirbyteStateMessage (69-77)
  • AirbyteStreamState (57-59)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
  • Cursor (50-82)
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
  • get_stream_state (168-184)
airbyte_cdk/sources/connector_state_manager.py (1)
  • get_stream_state (53-67)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/concurrent/cursor.py

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/cursor.py

airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

[error] 50-50: Function is missing a type annotation [no-untyped-def]


[error] 55-55: Returning Any from function declared to return 'str' [no-any-return]


[error] 61-61: Returning Any from function declared to return 'Mapping[str, Any] | None' [no-any-return]


[error] 67-67: 'InMemoryPartition' has no attribute '_name' [attr-defined]


[error] 69-69: 'InMemoryPartition' has no attribute '_name' [attr-defined]


[error] 221-221: Argument 1 to 'iterate_with_last_flag' has incompatible type 'Iterable[StreamSlice | None]'; expected 'Iterable[StreamSlice]' [arg-type]


[error] 222-222: Argument 1 to 'iterate_with_last_flag' has incompatible type 'Iterable[Mapping[str, Any] | AirbyteMessage]'; expected 'Iterable[StreamSlice]' [arg-type]


[error] 223-223: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 224-224: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 241-241: Incompatible types in assignment (expression has type 'Mapping[str, Any]', variable has type 'StreamSlice') [assignment]


[error] 276-276: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 277-277: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 277-277: Call to untyped function 'InMemoryPartition' in typed context [no-untyped-call]


[error] 278-278: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 464-464: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1904-1904: Argument 'stream_name' to 'get_stream_state' of 'ConnectorStateManager' has incompatible type 'str | None'; expected 'str' [arg-type]


[error] 3283-3283: Argument 2 to 'get_parent_state' of 'ConcurrentPerPartitionCursor' has incompatible type 'str | None'; expected 'str' [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (30)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)

77-83: Added blank line after docstring improves readability.

Nice addition of the blank line after the docstring. This follows PEP 8 style guidelines which recommend blank lines after docstrings, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/streams/concurrent/cursor.py

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

364-365:

✅ Verification successful

Updated stream_slicer type check to use ConcurrentPerPartitionCursor.

This change aligns with the PR's objective of migrating to concurrent cursors by replacing the PerPartitionWithGlobalCursor check with ConcurrentPerPartitionCursor. The change ensures proper type checking for the cursor implementation while maintaining the same control flow.


🏁 Script executed:

#!/bin/bash
# Description: Check for any remaining usages of PerPartitionWithGlobalCursor that might need updating

# Look for any other references to PerPartitionWithGlobalCursor
echo "Checking for remaining references to PerPartitionWithGlobalCursor:"
rg "PerPartitionWithGlobalCursor" --type python -A 2 -B 2 --glob "!**/test_*.py"

Length of output: 254


Below is an updated plan to gather additional verification before finalizing the review comment. The original command using --type python resulted in an error because ripgrep doesn't recognize "python" as a file type. Instead, we can re-run the search using a glob filter for Python files. Could you please run the following updated shell script to verify that there are no other references to PerPartitionWithGlobalCursor (excluding test files)?

Once you confirm that there are indeed no extra occurrences, we can conclude that the update in the code (swapping out the PerPartitionWithGlobalCursor check for ConcurrentPerPartitionCursor) is isolated and correct. wdyt?

If this verification confirms no additional issues, I'll then update the review comment accordingly with the approval that the change aligns with the PR's objective.


🏁 Script executed:

#!/bin/bash
# Re-run the reference search using a glob filter for Python files.
echo "Checking for any remaining references to PerPartitionWithGlobalCursor using glob filter for *.py files:"
rg "PerPartitionWithGlobalCursor" -g "*.py" -A 2 -B 2 --glob "!**/test_*.py"

Length of output: 4408


Type check update verified

  • The change in airbyte_cdk/sources/declarative/concurrent_declarative_source.py now properly updates the stream slicer type check to use ConcurrentPerPartitionCursor as intended.
  • Verification confirms that while other parts of the codebase still reference PerPartitionWithGlobalCursor (in incremental modules and record filter), these remain unaffected by this specific change.
  • The update aligns with the PR’s objective of migrating to concurrent cursors.

Would you like to review or address the remaining references to PerPartitionWithGlobalCursor in other modules, or are they intended to be left as is? wdyt?

airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)

4-15: Added logging import and instantiation.

Good addition of logging for better observability. The logger is properly set up with the module name as context, which follows best practices.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

airbyte_cdk/sources/declarative/retrievers/retriever.py (1)

9-9: Updated imports to use StreamSlice from airbyte_cdk.sources.types.

Good refactoring of imports to get StreamSlice and StreamState from a centralized location rather than from a specific implementation. This improves maintainability and follows the principle of getting types from their most appropriate location.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)

14-20: LGTM! New imports for improved state handling.

These imports from airbyte_cdk.models are needed for the new state handling functionality.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py


25-25: Good warning about potential circular imports.

Thanks for documenting the potential circular import issue.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py


271-271: LGTM! Parent state update after stream slices generation.

This line appropriately updates the parent state after processing all stream slices.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py


508-508: LGTM! Appropriate spacing before new method.

Good code formatting with blank line before the new static method.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py


509-517:

✅ Verification successful

LGTM! Useful helper for retrieving parent state

This static method provides a clean way to retrieve parent state for a given stream and construct an appropriate AirbyteStateMessage.


🏁 Script executed:

#!/bin/bash
# Check where this new method is being used in the codebase
rg -A 3 "get_parent_state" --type py

Length of output: 3128


LGTM! The get_parent_state static method is a clean and useful addition for retrieving the parent state. I verified its usage in both the unit tests and in the model_to_component_factory.py, and everything appears consistent with its intended design. The method integrates well with the existing state management logic. wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

745-745: LGTM! Additional context comment is helpful

The comment adds useful context about the path structure being tested.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py


875-882: LGTM! Improved assertion logic for single vs multiple expected states

Good improvement to the assertion logic to handle both single and multiple expected states appropriately.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py


1145-1147: LGTM! Added state field to expected state

This addition makes the test more robust by verifying the state's updated_at timestamp.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

30-38: New imports look fine.
They appear consistent with the rest of the code. Do you want to confirm there are no naming conflicts or unused references? wdyt?


543-543: New concurrency imports
Looks like you’ve added concurrent state management classes. Do you want to verify everything is indeed utilized and no extraneous imports remain? wdyt?


1907-1914: Check type compatibility: returning ConcurrentCursor
We’re returning a ConcurrentCursor while the signature expects a StreamSlicer. Although they share methods, is this fully type-safe? wdyt?


1954-1954: Double-checking model.name usage
We pass model.name to stream_name again here. Do you want to confirm that it’s never None before usage? wdyt?


1960-1960: Handling a potentially None stream_slicer
We return stream_slicer, but if it’s None, wouldn’t that cause issues later on? Should we guard against it? wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (13)

8-8: Added import for Mock - nice addition for testing.

The addition of unittest.mock.Mock is a good improvement for testing components by allowing proper mocking of dependencies. This aligns well with the cursor type changes.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


15-15: Updated import to include InMemoryMessageRepository.

Added InMemoryMessageRepository to allow testing the new message handling in concurrent cursors. This is a logical addition given the changes to cursor handling.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


46-47: Updated cursor imports to include ConcurrentPerPartitionCursor.

This import change aligns with the migration to concurrent cursors in the declarative package. The new cursor type will enable better concurrent execution of partitioned streams.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


169-170: Updated imports for concurrent cursors.

Adding the necessary imports for ConcurrentCursor and CursorField from the concurrent package, which will be used in the updated cursor implementation tests.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


190-190: Added start_time to input_config.

Adding a future-dated start_time to the shared input_config is a good approach as it's used across multiple tests with datetime-based cursors.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


909-910: Updated stream_slicer type assertion to ConcurrentPerPartitionCursor.

This change correctly updates the expected cursor type from PerPartitionWithGlobalCursor to ConcurrentPerPartitionCursor, which is consistent with the migration to concurrent cursors.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


913-915: Updated cursor factory test to expect ConcurrentCursor.

The assertion now correctly expects a ConcurrentCursor instance returned from the cursor factory, aligning with the concurrent cursor migration.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


1031-1032: Updated cursor type assertion in resumable_full_refresh_stream test.

Changed the assertion to expect SinglePartitionRouter which is appropriate for this test case. This is part of the consistent update to use the new cursor types throughout the test suite.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


1290-1291: Updated cursor type check in client_side_incremental_with_partition_router test.

Correctly updated the assertion to expect ConcurrentPerPartitionCursor instead of the previous implementation, keeping the test aligned with the ongoing concurrent cursor migration.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


2474-2475: Modified test case to expect ConcurrentPerPartitionCursor.

This change in the parameterized test ensures that when both incremental and partition router are used, the factory creates a ConcurrentPerPartitionCursor as expected.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


2499-2500: Updated expected type for partition routers with multiple components.

Changed the expected type to ConcurrentPerPartitionCursor which is correct for the scenario with multiple partition routers and an incremental component.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


2546-2547: Updated expected type for test with partition router only.

Changed to expect ListPartitionRouter directly, which is appropriate when only a partition router is provided without incremental sync.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py


2924-2925: Updated assertion to check for ConcurrentCursor type.

This properly verifies the retriever's stream_slicer is a ConcurrentCursor instance, aligning with the concurrent cursor migration.

🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Comment on lines +111 to +112
if message.type == Type.STATE:
return # FIXME this is horribly dumb but allows me to test not emitting state messages. We can probably create another decorator that filters and set this only for substream partition router
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Temporary code preventing STATE messages from being emitted

This change prevents all state messages from being emitted, which could lead to data loss or repeated data if a sync is interrupted in production.

The FIXME comment indicates this is temporary, but it's a risky change to merge without a proper filtering mechanism in place.

Would creating a proper decorator to filter state messages for specific cases (as your comment suggests) be a better approach before merging this change?

@@ -289,6 +297,7 @@ def _generate_slices_from_partition(
]
!= parent_state
):
print(f"GODO:\n\t{parent_state}") # FIXME parent state needs to be tracked in substream partition router
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Debug print statement needs to be removed before merging

There's a debug print statement with sensitive formatting (GODO) that should be removed before production.

-                print(f"GODO:\n\t{parent_state}")  # FIXME parent state needs to be tracked in substream partition router
+                # parent state needs to be tracked in substream partition router

Would you like to convert this to a proper logging statement instead?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"GODO:\n\t{parent_state}") # FIXME parent state needs to be tracked in substream partition router
# parent state needs to be tracked in substream partition router
🧰 Tools
🪛 GitHub Actions: Linters

[warning] Would reformat: airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

@@ -1908,52 +1900,29 @@ def _build_incremental_cursor(
stream_slicer: Optional[PartitionRouter],
config: Config,
) -> Optional[StreamSlicer]:
stream_state = self._connector_state_manager.get_stream_state(
stream_name=model.name, namespace=None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Pipeline failure: model.name can be None
The type checker complains about a possible None for stream_name. Should we enforce a default or validate model.name is non-null? wdyt?

Possible fix:

- stream_name=model.name, namespace=None
+ stream_name=model.name if model.name else "<default_stream_name>", namespace=None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
stream_name=model.name, namespace=None
stream_name=model.name if model.name else "<default_stream_name>", namespace=None
🧰 Tools
🪛 GitHub Actions: Linters

[error] 1904-1904: Argument 'stream_name' to 'get_stream_state' of 'ConnectorStateManager' has incompatible type 'str | None'; expected 'str' [arg-type]

Comment on lines 3283 to 3287
parent_state = ConcurrentPerPartitionCursor.get_parent_state(self._connector_state_manager.get_stream_state(kwargs["stream_name"], None), model.stream.name)
connector_state_manager = ConnectorStateManager([parent_state] if parent_state else [])
else:
connector_state_manager = ConnectorStateManager([])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Pipeline failure: model.stream.name could be None
ConcurrentPerPartitionCursor.get_parent_state expects a non-null string. Should we default to an empty string if model.stream.name is None? wdyt?

Example fix:

- parent_state = ConcurrentPerPartitionCursor.get_parent_state(..., model.stream.name)
+ safe_name = model.stream.name if model.stream.name else "<default_stream_name>"
+ parent_state = ConcurrentPerPartitionCursor.get_parent_state(..., safe_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
parent_state = ConcurrentPerPartitionCursor.get_parent_state(self._connector_state_manager.get_stream_state(kwargs["stream_name"], None), model.stream.name)
connector_state_manager = ConnectorStateManager([parent_state] if parent_state else [])
else:
connector_state_manager = ConnectorStateManager([])
safe_name = model.stream.name if model.stream.name else "<default_stream_name>"
parent_state = ConcurrentPerPartitionCursor.get_parent_state(self._connector_state_manager.get_stream_state(kwargs["stream_name"], None), safe_name)
connector_state_manager = ConnectorStateManager([parent_state] if parent_state else [])
else:
connector_state_manager = ConnectorStateManager([])
🧰 Tools
🪛 GitHub Actions: Linters

[error] 3283-3283: Argument 2 to 'get_parent_state' of 'ConcurrentPerPartitionCursor' has incompatible type 'str | None'; expected 'str' [arg-type]

Comment on lines 32 to 46
def iterate_with_last_flag(generator: Iterable[StreamSlice]) -> Iterable[tuple[StreamSlice, bool]]:

iterator = iter(generator)

try:
current = next(iterator)
except StopIteration:
return # Return an empty iterator

for next_item in iterator:
yield current, False
current = next_item

yield current, True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reconsider the expected input type for iterate_with_last_flag.
You’ve strongly typed the function to accept an Iterable[StreamSlice], yet you call it with record iterables further below (lines 221-222). This is causing a type mismatch in the pipeline. Would you consider either broadening the function type (e.g., a generic iterable) or creating a separate helper for record iteration, wdyt?

Comment on lines 221 to 254
for _slice, is_last_slice in iterate_with_last_flag(parent_stream.retriever.stream_slices()):
for parent_record, is_last_record_in_slice in iterate_with_last_flag(parent_stream.retriever.read_records({}, _slice)):
if hasattr(parent_stream.retriever.stream_slicer, "observe"): # FIXME it seems like a dumb way to access the method
parent_stream.retriever.stream_slicer.observe(parent_record)
parent_partition = None
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
self.logger.warning(
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
)
if parent_record.type == MessageType.RECORD:
parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record
else:
continue
elif isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
parent_record = parent_record.data
elif not isinstance(parent_record, Mapping):
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)
try:
partition_value = dpath.get(
parent_record, # type: ignore [arg-type]
parent_field,
)
except KeyError:
continue
elif isinstance(parent_record, Record):
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
parent_record = parent_record.data
elif not isinstance(parent_record, Mapping):
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
raise AirbyteTracedException(
message=f"Parent stream returned records as invalid type {type(parent_record)}"
)
try:
partition_value = dpath.get(
parent_record, # type: ignore [arg-type]
parent_field,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Type mismatch and usage of iterate_with_last_flag on record iteration.
It seems you’re passing parent_stream.retriever.stream_slices() and read_records(...) directly to iterate_with_last_flag, which is typed to handle Iterable[StreamSlice]. Meanwhile, your second call (line 222) returns an iterable of parent records (e.g., Mapping[str, Any] | AirbyteMessage). This triggers a pipeline error. Would you consider either creating a separate helper for record iteration or adjusting the function’s type signature, wdyt?

Additionally, references to parent_stream.retriever.stream_slicer may be invalid if retriever doesn’t store a stream_slicer attribute. Possibly you intended to call something like parent_stream.stream_slicer? You might need to adjust your references or verify parent_stream.retriever indeed has a stream_slicer property, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 221-221: Argument 1 to 'iterate_with_last_flag' has incompatible type 'Iterable[StreamSlice | None]'; expected 'Iterable[StreamSlice]' [arg-type]


[error] 222-222: Argument 1 to 'iterate_with_last_flag' has incompatible type 'Iterable[Mapping[str, Any] | AirbyteMessage]'; expected 'Iterable[StreamSlice]' [arg-type]


[error] 223-223: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 224-224: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 241-241: Incompatible types in assignment (expression has type 'Mapping[str, Any]', variable has type 'StreamSlice') [assignment]

Comment on lines 267 to 278
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
except KeyError:
continue

# Add extra fields
extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
parent_record,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)

if is_last_record_in_slice and hasattr(parent_stream.retriever.stream_slicer, "close_partition"): # FIXME it seems like a dumb way to access the method
parent_stream.retriever.stream_slicer.close_partition(InMemoryPartition(parent_stream.name, _slice))
parent_stream.retriever.stream_slicer.ensure_at_least_one_state_emitted()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Closing partition references stream_slicer again.
This block calls close_partition(...) on parent_stream.retriever.stream_slicer, which might not exist if retriever has no stream_slicer attribute. Please confirm you’re referencing the correct property or method. Would you consider aligning this call with your actual retriever capabilities, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 276-276: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 277-277: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]


[error] 277-277: Call to untyped function 'InMemoryPartition' in typed context [no-untyped-call]


[error] 278-278: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]

@@ -414,7 +461,7 @@
parent_state = {}
for parent_config in self.parent_stream_configs:
if parent_config.incremental_dependency:
parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state)
parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.retriever.stream_slicer.state) # FIXME move to default stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Invalid stream_slicer usage in get_stream_state.
On line 464, you copy the state from parent_config.stream.retriever.stream_slicer.state. If retriever doesn’t define stream_slicer, this will fail at runtime. Would you consider referencing a valid interface instead, or ensuring the retriever does indeed expose a stream_slicer attribute, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 464-464: 'Retriever' has no attribute 'stream_slicer'; maybe 'stream_slices'? [attr-defined]

Comment on lines 48 to 70
class InMemoryPartition(Partition):

def __init__(self, stream_name, _slice):
self._stream_name = stream_name
self._slice = _slice

def stream_name(self) -> str:
return self._stream_name

def read(self) -> Iterable[Record]:
yield from []

def to_slice(self) -> Optional[Mapping[str, Any]]:
return self._slice

def __hash__(self) -> int:
if self._slice:
# Convert the slice to a string so that it can be hashed
s = json.dumps(self._slice, sort_keys=True)
return hash((self._name, s))
else:
return hash(self._name)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix attribute name and add missing type annotations.

  1. The reference to self._name in your hash method (lines 67 and 69) is causing errors because _name does not exist. The constructor stores self._stream_name.
  2. The constructor (line 50) and other methods are missing type annotations, which is causing pipeline lint errors.

Would you consider something like this? wdyt?

 class InMemoryPartition(Partition):

-    def __init__(self, stream_name, _slice):
+    def __init__(self, stream_name: str, _slice: Optional[Mapping[str, Any]]) -> None:
         self._stream_name = stream_name
         self._slice = _slice

     def stream_name(self) -> str:
         return self._stream_name

     def read(self) -> Iterable[Record]:
         yield from []

     def to_slice(self) -> Optional[Mapping[str, Any]]:
         return self._slice

     def __hash__(self) -> int:
         if self._slice:
             s = json.dumps(self._slice, sort_keys=True)
-            return hash((self._name, s))
+            return hash((self._stream_name, s))
         else:
-            return hash(self._name)
+            return hash(self._stream_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class InMemoryPartition(Partition):
def __init__(self, stream_name, _slice):
self._stream_name = stream_name
self._slice = _slice
def stream_name(self) -> str:
return self._stream_name
def read(self) -> Iterable[Record]:
yield from []
def to_slice(self) -> Optional[Mapping[str, Any]]:
return self._slice
def __hash__(self) -> int:
if self._slice:
# Convert the slice to a string so that it can be hashed
s = json.dumps(self._slice, sort_keys=True)
return hash((self._name, s))
else:
return hash(self._name)
class InMemoryPartition(Partition):
def __init__(self, stream_name: str, _slice: Optional[Mapping[str, Any]]) -> None:
self._stream_name = stream_name
self._slice = _slice
def stream_name(self) -> str:
return self._stream_name
def read(self) -> Iterable[Record]:
yield from []
def to_slice(self) -> Optional[Mapping[str, Any]]:
return self._slice
def __hash__(self) -> int:
if self._slice:
# Convert the slice to a string so that it can be hashed
s = json.dumps(self._slice, sort_keys=True)
return hash((self._stream_name, s))
else:
return hash(self._stream_name)
🧰 Tools
🪛 GitHub Actions: Linters

[error] 50-50: Function is missing a type annotation [no-untyped-def]


[error] 55-55: Returning Any from function declared to return 'str' [no-any-return]


[error] 61-61: Returning Any from function declared to return 'Mapping[str, Any] | None' [no-any-return]


[error] 67-67: 'InMemoryPartition' has no attribute '_name' [attr-defined]


[error] 69-69: 'InMemoryPartition' has no attribute '_name' [attr-defined]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant