-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources #452
base: main
Are you sure you want to change the base?
Conversation
… bug around character chunking
…nd fix bug emitting as Record instead of mapping
📝 WalkthroughWalkthroughThis pull request introduces several new components and related logic to enhance the declarative schema used for handling record merging and property extraction from APIs. New YAML definitions and corresponding Python classes (such as Changes
Sequence Diagram(s)sequenceDiagram
participant SR as SimpleRetriever
participant QP as QueryProperties
participant PC as PropertyChunking
participant R as Retriever
SR->>QP: Check for additional query properties
alt QueryProperties exists
QP->>PC: Retrieve property chunks
PC-->>QP: Return property chunks
loop For each property chunk
SR->>R: Request records for current chunk
R-->>SR: Return records
end
SR->>SR: Merge records based on key
else No QueryProperties
SR->>R: Retrieve records normally
R-->>SR: Return records
end
Possibly related PRs
Suggested labels
What do you think about these updates? Do they capture everything you intended? ✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (28)
unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (1)
29-34
: Would a non-existent key be clearer than an empty string for this test case?The test is validating the behavior when a key isn't present in the record data, but using
[""]
(empty string) as the key might be confusing. Perhaps using a key that clearly doesn't exist in the record data (like["non_existent_key"]
) would make the test's intention more obvious? This would more explicitly test that the method returnsNone
when a key isn't found in the record. wdyt?airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py (1)
87-108
: Good validation checks, but could the error messages be more developer-friendly?The validation logic is thorough and correctly checks for the presence and type of query properties. I like how you're doing extensive validation before trying to use the properties.
One minor suggestion: all error messages include "Please contact Airbyte Support" which might not be necessary for developer-facing errors. Could these messages be more actionable for developers working with the codebase, wdyt?
- "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" + "stream_slice should not be None if query properties in requests is enabled"And similar changes for the other error messages.
unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (2)
15-58
: Nicely structured parametric tests.
These scenarios comprehensively cover various property chunking constraints and inclusion behaviors. Would you consider adding an extra scenario with a single-element list to confirm the code’s behavior in edge cases, wdyt?
87-99
: Check additional record fields in merge logic test?
Testingget_merge_key
with a single key is good, but what about verifying how it behaves if the record has no"id"
or has nested fields? Might be worth exploring more complex data. wdyt?airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
94-94
: Expose optional query properties.
Declaringadditional_query_properties
as optional is flexible for different user scenarios, which is great. Would you consider clarifying the docstring to detail the typical usage of this attribute, wdyt?
453-465
: Conditional logic foradditional_query_properties
.
This check correctly distinguishes between regular single-chunk and multi-chunk scenarios. Have you considered logging or tracking when multiple property chunks are used for debugging or traceability, wdyt?
469-523
: Merging records across property chunks.
The approach to accumulate partial records inmerged_records
is powerful. One potential improvement: do you want to handle collisions for fields with conflicting data (e.g., if two chunks have different values for the same key), wdyt?airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
13-18
: Consider expanding docstring details.
The docstring is minimal. Would adding examples or clarifying usage scenarios help future maintainers, wdyt?airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1)
14-20
: Could you enhance the enum docstring?The current docstring for
PropertyLimitType
is very minimal ("yeah"). Consider replacing it with a more descriptive explanation of what these limit types represent and how they affect property chunking, wdyt?class PropertyLimitType(Enum): """ - yeah + Defines the type of limit to apply when chunking properties. + + 'characters': Limit based on the total number of characters in property names + 'property_count': Limit based on the number of properties """ characters = "characters" property_count = "property_count"airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)
1-2
: Adding copyright notice is great! Small year correction needed.I noticed the copyright year is set as 2025, which is likely a typo. Would it make sense to change this to 2023 or 2024 instead? wdyt?
-# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
28-43
: Simple suggestion for the property yielding logicThe method implementation looks good! For line 42, could we simplify this slightly? The current
yield from [list(fields)]
is creating a list within a list, and then yielding from it. Would it be cleaner to justyield list(fields)
directly? wdyt?- yield from [list(fields)] + yield list(fields)unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (3)
91-118
: Well-implemented dynamic endpoint testI like the use of Mock to test the dynamic endpoint case without actual network calls. One small suggestion - would it be valuable to also verify that
get_properties_from_endpoint
is called with the correctstream_slice
parameter? This would ensure the proper context is passed through. wdyt?# Add this assertion after line 113 properties_from_endpoint_mock.get_properties_from_endpoint.assert_called_once_with(stream_slice=stream_slice)
120-165
: Good parameterized testing for has_multiple_chunksThe parameterized test for
has_multiple_chunks
provides good coverage. One small detail - both test cases have the same id "test_has_multiple_chunks" (lines 127 and 131). Would it be clearer to use more specific ids like "with_multiple_chunks" and "with_single_chunk" to differentiate them better in test reports? wdyt?pytest.param( 5, True, - id="test_has_multiple_chunks", + id="with_multiple_chunks", ), pytest.param( 10, False, - id="test_has_multiple_chunks", + id="with_single_chunk", ),
18-166
: Consider adding tests for edge casesThe tests look comprehensive for the happy paths! Would it be valuable to add a couple of edge cases?
- A test for when
property_list
isNone
or an empty list- A test for when
property_chunking
isNone
These would ensure complete coverage of the code paths in the implementation. wdyt?
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)
1047-1059
: New Merge Strategy: EmitPartialRecordMergeStrategy
The definition forEmitPartialRecordMergeStrategy
is clear and follows our schema pattern well. Have you thought about adding specific test cases to verify how it handles a record split across multiple API requests, especially given that it’s intended for records without a unique identifier? wdyt?
1768-1790
: Clarification on GroupByKeyMergeStrategy Implementation
The newGroupByKeyMergeStrategy
appears to support grouping records based on a key that can be either a single string or an array. Could you elaborate on how composite keys (arrays) will be processed during record merging? Also, have you added tests to ensure both single-field and multi-field keys behave as expected? wdyt?
3031-3060
: Introducing PropertiesFromEndpoint
This block defines a way to dynamically fetch a list of properties via an API endpoint, which is a neat addition. Would you consider enhancing the documentation or examples to cover more complex cases (for instance, when additional parameters are involved)? Also, do you have tests planned to verify that theretriever
integration works seamlessly? wdyt?
3061-3090
: New Property Chunking Component
ThePropertyChunking
component clearly outlines how to limit the number of properties per request and the merging strategy. Would it be useful to include an example or note clarifying how differentproperty_limit_type
values (such ascharacters
versusproperty_count
) affect the chunking behavior? wdyt?
3091-3121
: QueryProperties Component Addition
TheQueryProperties
component nicely unifies the concept of static property lists and dynamic retrieval viaPropertiesFromEndpoint
. How do you envision the interplay betweenalways_include_properties
andproperty_chunking
in edge cases where API limitations are stringent? Perhaps a few more examples or tests could clarify this behavior. wdyt?airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
350-353
: Consider adding a docstring.
Would you like to add a short docstring explaining howEmitPartialRecordMergeStrategy
is intended to merge partial records? wdyt?
1215-1235
: Address minor grammar nit.
In the field description, "Dictates how to records..." might be missing a word. Maybe "Dictates how records requiring multiple requests..." wdyt?
2325-2337
: Consider error handling for missing fields.
Would you like to handle the scenario whereproperty_field_path
is absent or doesn't exist in the response? wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
4368-4384
: Should the pytest.raises statement include the expected exception type?The test is correctly checking that an invalid property limit type raises an exception, but the
pytest.raises
statement doesn't specify which exception type is expected. Should this bewith pytest.raises(ValueError):
or another specific exception type, wdyt?- with pytest.raises: + with pytest.raises(ValueError):airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
808-813
: New create_emit_partial_record method
The method is straightforward. Would you consider adding a small docstring explaining usage? wdyt?
2142-2145
: New create_group_by_key method
Implementation looks okay. Do you want to check ifmodel.key
is non-empty or handle unexpected values? wdyt?
2151-2151
: Adding optional query_properties_key param
This extra parameter is helpful for hooking up query properties. Perhaps document its purpose in a docstring? wdyt?
2886-2920
: Handling QueryProperties in request_parameters
The logic to accept only oneQueryProperties
is clear. Would you like to refactor this into a helper method for readability or plan for multiple definitions later? wdyt?
3041-3051
: New _remove_query_properties helper
Implementation is straightforward. Would you consider logging or handling the case of multiple matching parameters? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(4 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py
(3 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(5 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/requesters/query_properties/__init__.py
(1 hunks)unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py
(1 hunks)unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py
(1 hunks)unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py
(1 hunks)unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py
(1 hunks)unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py
(2 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (14)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
EmitPartialRecord
(13-23)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
GroupByKey
(13-33)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)
RecordMergeStrategy
(11-19)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
RecordMergeStrategy
(11-19)get_group_key
(18-19)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (16)
Config
(132-133)Config
(146-147)Config
(160-161)Config
(174-175)Config
(192-193)Config
(206-207)Config
(220-221)Config
(234-235)Config
(248-249)Config
(262-263)Config
(276-277)Config
(290-291)Config
(306-307)Config
(320-321)Config
(334-335)Config
(373-374)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
get_group_key
(25-33)
airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
PropertiesFromEndpoint
(14-40)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
PropertiesFromEndpoint
(2325-2336)PropertyChunking
(1215-1234)QueryProperties
(2339-2356)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1)
PropertyChunking
(24-68)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
QueryProperties
(14-51)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
get_group_key
(22-23)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
get_group_key
(25-33)
unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (4)
PropertyChunking
(24-68)PropertyLimitType
(14-20)get_request_property_chunks
(41-65)get_merge_key
(67-68)
unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
GroupByKey
(13-33)get_group_key
(25-33)unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)
test_get_merge_key
(87-98)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
get_group_key
(22-23)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)
get_group_key
(18-19)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyChunking
(24-68)PropertyLimitType
(14-20)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
GroupByKey
(13-33)airbyte_cdk/sources/types.py (4)
Record
(20-63)associated_slice
(38-39)StreamSlice
(66-160)data
(34-35)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
SimpleRetriever
(54-643)name
(106-114)name
(117-119)read_records
(440-559)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
read_records
(440-559)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (3)
QueryProperties
(14-51)get_request_property_chunks
(28-42)has_multiple_chunks
(44-51)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
get_request_property_chunks
(41-65)get_merge_key
(67-68)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (2)
PropertiesFromEndpoint
(14-40)get_properties_from_endpoint
(30-40)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyChunking
(24-68)get_request_property_chunks
(41-65)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
GroupByKey
(13-33)get_group_key
(25-33)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
RecordMergeStrategy
(11-19)get_group_key
(18-19)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (16)
Config
(132-133)Config
(146-147)Config
(160-161)Config
(174-175)Config
(192-193)Config
(206-207)Config
(220-221)Config
(234-235)Config
(248-249)Config
(262-263)Config
(276-277)Config
(290-291)Config
(306-307)Config
(320-321)Config
(334-335)Config
(373-374)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
get_request_property_chunks
(28-42)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyLimitType
(14-20)PropertyChunking
(24-68)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
SimpleRetriever
(54-643)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
PropertyChunking
(1215-1234)PropertiesFromEndpoint
(2325-2336)QueryProperties
(2339-2356)PropertyLimitType
(1210-1212)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyChunking
(24-68)PropertyLimitType
(14-20)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
GroupByKey
(13-33)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (6)
EmitPartialRecordMergeStrategy
(350-352)GroupByKeyMergeStrategy
(728-736)PropertiesFromEndpoint
(2325-2336)PropertyChunking
(1215-1234)PropertyLimitType
(1210-1212)QueryProperties
(2339-2356)
🔇 Additional comments (57)
unit_tests/sources/declarative/requesters/query_properties/__init__.py (1)
1-1
: LGTM! The copyright notice is correctly in place.Adding just a copyright notice without any imports is typical for an empty
__init__.py
file to mark a directory as a Python package. The AI summary mentions imports that aren't visible in the provided code, but this might be part of future changes.airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py (1)
1-13
: LGTM! The imports and exports are correctly structured.The file properly imports the required strategy classes and exports them via
__all__
, following Python best practices for package organization. This provides a clean interface for importing these strategies from other modules.airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py (1)
1-13
: LGTM! The imports and exports are correctly structured.The file properly imports the required classes (
PropertiesFromEndpoint
,PropertyChunking
, andQueryProperties
) and exports them via__all__
, which follows Python best practices for package organization. This provides a clean interface for importing these components from other modules.unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (2)
9-36
: LGTM! Well-structured parameterized tests.The test cases cover different scenarios for the
get_group_key
method, including single keys, multiple keys, and missing keys. The parameterization makes the tests concise and maintainable.
37-41
: LGTM! The test implementation is clear and concise.The test creates a
GroupByKey
instance with the provided key, calls theget_group_key
method with the provided record, and verifies the result matches the expected value.airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)
1-20
: Well-designed abstract base class for record merging strategies!This is a nice, clean implementation of the
RecordMergeStrategy
abstract base class that establishes a clear interface for how records requiring multiple requests should be merged. The abstractget_group_key
method provides a clear contract for subclasses to implement.The design allows for different merging strategies like
EmitPartialRecord
andGroupByKey
to implement their own grouping logic while sharing a common interface. This will make it easier to extend with additional strategies in the future.airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py (1)
43-43
: Clear addition of query_properties_key with proper type annotation.Good addition of this optional field to support query properties in request options. The type annotation is appropriate here.
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
1-24
: Clean implementation with good initialization logic.The
GroupByKey
class has a nice implementation with appropriate type hints and initialization logic. The__post_init__
method correctly handles both string and list inputs for thekey
parameter, ensuring consistent internal representation.
25-33
: Good implementation of get_group_key, but what about array values?The method correctly handles missing keys by returning
None
when any key is missing. This ensures records don't get incorrectly grouped.One question: how should this handle cases where
key_value
might be an array/list itself? Currently, it would just append the array as-is toresolved_keys
, which might lead to unexpected behavior when joining. Should there be specific handling for this case or is the current behavior intentional, wdyt?airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
1-24
: Simple and effective implementation for emitting partial records.This is a straightforward implementation of
RecordMergeStrategy
that always returnsNone
fromget_group_key
, indicating that records should not be grouped. The class name and docstring clearly communicate its purpose of emitting partial records without merging.This strategy will be useful in scenarios where records don't have suitable primary keys for merging.
unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)
60-86
: Clear and concise test forget_request_property_chunks
.
Everything looks great here. The iteration and assertions align well with chunked outputs. No immediate concerns.airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
6-6
: Good use ofdefaultdict
for record merging.
Pulling indefaultdict
is a neat way to handle the multi-chunk merging logic. No objections here.
533-559
: Handling Full Refresh for chunk-less scenario.
This logic effectively checks theFULL_REFRESH_SYNC_COMPLETE_KEY
. Everything looks well integrated. No issues spotted.airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
19-28
: Check if multipleproperty_field_path
levels are needed.
You handle a list of path segments, which is good. If the path is deeply nested, do you want to confirm that each segment evaluation works properly, wdyt?unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py (2)
184-213
: Good job on thorough test coverage for query properties validation!I like how you've covered a comprehensive set of test cases using parameterization, testing both valid cases and various error scenarios. This will ensure robust validation of query properties in stream slices.
214-232
: Clear test implementation with appropriate assertions.The test function implementation is well-structured with clear conditional logic for handling both error and success cases. The assertions properly validate the expected behavior.
unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py (3)
12-54
: Well-designed test for basic property extraction!Good job creating this test with a comprehensive list of expected properties and clear verification. The mock setup is clean and the assertions validate both the count and content of the properties.
57-99
: Great test for handling nested properties!This test nicely validates the ability to extract properties from nested structures using multiple field paths. The test data is well-structured and the assertions are appropriate.
102-145
: Excellent test for interpolation functionality!I appreciate how you've tested the dynamic field path interpolation using a config parameter. This ensures the flexibility of the implementation for real-world usage where paths may be configuration-driven.
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (5)
33-40
: Clean imports for the new query properties components.The imports are well-organized and focused on the new functionality being tested.
1003-1003
: Minor name change in test data.Simple update from "erobertric" to "robert" in the test data. This looks like a straightforward correction or preference change.
1105-1266
: Comprehensive test for property chunking across multiple requests!This test thoroughly validates the core functionality of retrieving and merging records across multiple property chunks. The test data is well-structured with a clear expected outcome, and the test covers the complete workflow from request to record merging.
1269-1443
: Good boundary case test for single chunk processing!I like how you're testing the boundary condition where all properties fit within a single chunk (property_limit of 10). This ensures the chunking logic works correctly even when chunking isn't strictly needed.
1446-1602
: Important test for graceful handling of missing merge keys!This test ensures that records are still emitted even when a valid merge key isn't available (using "not_real" as the key). This is crucial for fault tolerance in real-world scenarios where data might not always match expectations.
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (3)
23-40
: Good class design with sensible defaults!I like how you've designed the
PropertyChunking
class with clear attributes and a sensible default for the merge strategy. The class docstring effectively explains the purpose of this component.
41-65
: Well-implemented chunking method with comprehensive edge cases!The
get_request_property_chunks
method is well-implemented with handling for various scenarios:
- When no property limit is set
- Respecting the always_include_properties in each chunk
- Different sizing based on the limit type
The logic for determining when to create a new chunk is clean and easy to follow.
67-68
: Concise delegation to the merge strategy.The
get_merge_key
method cleanly delegates to the underlying merge strategy, maintaining good separation of concerns.airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)
13-27
: This component design looks well-thought out!The class structure with support for both static property lists and dynamic retrieval is elegant. I particularly like how you've made both the
always_include_properties
andproperty_chunking
optional, giving implementers flexibility based on their specific API requirements.
44-51
: The implementation for checking multiple chunks is cleverNice approach to checking for multiple chunks by trying to get two chunks from the iterator! This is a clean way to determine if chunking will actually result in multiple API calls without having to materialize all chunks.
unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (3)
1-17
: Good test setup with proper importsThe imports and test setup look good! I like that you're properly importing the necessary components and mocks for testing.
21-54
: Great test for chunking with static property listThe test is well-structured and validates the correct behavior of property chunking with a static list. I like the clear assertions that verify both the number of chunks and their contents.
56-89
: Thorough testing of always_include_properties functionalityGood job testing the always_include_properties feature! The assertions clearly verify that "zero" is included at the beginning of each chunk, which is exactly what we'd expect.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
728-737
: Validate non-empty key?
Would you consider ensuring that thekey
field is not empty or None, possibly by adding a Pydantic validator? wdyt?
1210-1213
: Looks correct.
These enum entries appear aligned with the intended usage.
2339-2357
: Check for duplication with always-include properties.
Ifproperty_chunking
is used along withalways_include_properties
, do you want logic to avoid duplicated properties in the final list? wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (7)
75-75
: LGTM! Added import for the new PropertyChunking model.This import is needed for the new test cases that verify property chunking functionality.
128-137
: LGTM! Added imports for query properties components.These imports are necessary for the new test cases that verify the QueryProperties, PropertiesFromEndpoint, PropertyChunking, PropertyLimitType, and GroupByKey components.
4012-4107
: LGTM! Well-structured test for SimpleRetriever with QueryProperties.This test validates that:
- The QueryProperties object is correctly created with property list and always_include_properties
- PropertyChunking is configured correctly with property_count limit type
- GroupByKey merge strategy is set up correctly
- The request_options_provider properly handles the QueryProperties
The test aligns with the PR objective of supporting static property lists like in source-linkedin-ads.
4108-4203
: LGTM! Good test coverage for SimpleRetriever with PropertiesFromEndpoint.This test ensures that properties can be dynamically fetched from an endpoint, aligning with the PR objective of supporting the pattern used in source-hubspot. The test validates:
- Properties can be fetched from an endpoint
- The nested SimpleRetriever is properly configured
- Property chunking is correctly set up
4204-4261
: LGTM! Good validation test for incorrect query property types.This test verifies that an error is raised when something that is not a QueryProperties object is used in the request_parameters. Using a ListPartitionRouter as the test case provides good coverage for type safety.
4263-4346
: LGTM! Proper error handling for multiple QueryProperties.This test ensures that an error is raised when multiple QueryProperties objects are provided in the request_parameters, preventing potential misconfigurations.
4348-4366
: LGTM! Good test for character-based property chunking.This test verifies that PropertyChunking can be created with the "characters" limit type, which is useful for APIs that have character-length limitations on property lists.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (15)
2-2
: Check copyright year.
This updated year seems correct for 2025. Would you consider verifying it's in sync with your project’s timeline? wdyt?
228-230
: New import for EmitPartialRecordMergeStrategyModel
The import aligns well with its usage in the constructor map below. wdyt?
240-242
: New import for GroupByKeyMergeStrategyModel
This matches the newcreate_group_by_key
function. Looks good. wdyt?
333-335
: Imported PropertiesFromEndpointModel
Including this import is consistent with its usage increate_properties_from_endpoint
. wdyt?
336-338
: Imported PropertyChunkingModel
No issues; it’s properly referenced increate_property_chunking
. wdyt?
339-341
: Imported PropertyLimitTypeModel
This addition is consistent with your property limit logic. wdyt?
342-344
: Imported QueryPropertiesModel
No concerns here; it supports the new query properties feature. wdyt?
453-457
: Runtime class imports for QueryProperties
Bringing these in fromquery_properties
complements the pydantic model imports. wdyt?
458-460
: Imported PropertyLimitType
Consistently follows your property handling logic. wdyt?
461-464
: Imported EmitPartialRecord and GroupByKey
No issues with these additions. wdyt?
626-626
: Mapping EmitPartialRecordMergeStrategyModel
This entry ensures the factory can build the correct merge strategy. wdyt?
630-630
: Mapping GroupByKeyMergeStrategyModel
This binds the newcreate_group_by_key
method. All good. wdyt?
2184-2184
: Passing query_properties_key to the request options
Matches the newly introduced parameter and looks fine. wdyt?
2925-2931
: Creating the requester with query_properties_key
Ties into the new parameter seamlessly. wdyt?
3037-3037
: Supplying additional_query_properties to SimpleRetriever
This references the property set for query chunking. Looks consistent. wdyt?
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py
Outdated
Show resolved
Hide resolved
query_properties_key = key | ||
query_properties_definitions.append(request_parameter) | ||
elif not isinstance(request_parameter, str): | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't get json schema to pydantic model conversions to enforce str
, QueryProperties
. It always would get rendered as Any
regardless of what combo I made once we included both a #ref
and concrete type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add this comment to the code directly because I would be surprised to see this validation and I would probably search for the explanation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just need a bit more information and I'll review this again. I can definitely say is that this YAML interface is clean and I really like it
@@ -1044,6 +1044,18 @@ definitions: | |||
$parameters: | |||
type: object | |||
additionalProperties: true | |||
EmitPartialRecordMergeStrategy: | |||
title: Emit Partial Record | |||
description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we checked the streams that use this strategies in HubSpot? Checking real quick on CompaniesPropertyHistory
, it seems like there is a primary key (see image) but I remember us saying that this strategy was used.
Therefore it would either seem like:
- I'm wrong and
CompaniesPropertyHistory
does not use this strategy - It is not only a question of having a PK or not and we should improve this description to clarify that
- There is a bug in source-hubspot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we had talked about this before, but it wasn't just about the primary key, but whether denormalize_records
was True
and if it was, then we do not perform record merging. Only the property history streams change this value. See https://github.com/airbytehq/airbyte/blob/01c9c1a76bd4c4330e18185e789421ccdbd3090f/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py#L2018
Ultimately I don't think I fully know the context of why we denormalize records for these streams. So I would be okay taking out this merge strategy and waiting until we get to these streams on Hubspot and adding it back if you don't want us to support this yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source: airbytehq/airbyte#13596 which was related this this oncall issue
Checking more comments, it seems like the properties history streams didn't have pk before:
- https://github.com/airbytehq/oncall/issues/259#issuecomment-1149506504
- PropertyHistory was as full fledge stream which wasn't delineated into more precise propertyhistory like
CompaniesPropertyHistory
where no PKs were defined
One interesting thing I see is that the timestamp is on the primary key (example for CompaniesPropertyHistory). The same applies to ContactsPropertyHistory even though it's not PropertyHistoryV3.
Honestly, it sounds to me like it only works because of the timestamp in the PK and therefore, we could have the merge strategy for this as well. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmk yep you've convinced me, I can take out this strategy, no need to support it if it doesn't feel like we need it or hubspot can be modified to not use this strategy at all
query_properties_key = key | ||
query_properties_definitions.append(request_parameter) | ||
elif not isinstance(request_parameter, str): | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add this comment to the code directly because I would be surprised to see this validation and I would probably search for the explanation
@dataclass | ||
class PropertiesFromEndpoint: | ||
""" | ||
tbd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbd
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py
Show resolved
Hide resolved
|
||
class PropertyLimitType(Enum): | ||
""" | ||
yeah |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
Show resolved
Hide resolved
else: | ||
yield from [list(fields)] | ||
|
||
def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It feels like we could let that to the consumer of the query properties (in our case, SimpleRetriever) we don't need to maintain this new method. Do we know in which other case we would need this method? I'm mostly worried about maintaining multiple methods that might generate multiple same HTTP requests multiple times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fair and it does give the impression this is a "cheap" method, but in reality makes an API call. The main reason i ended up adding it was that we needed to introspect the iterable to detect if there were at least two elements. And we've been burned in the past with tee()
not deallocating slices properly.
But I think we can move this method up into the retriever and if we combine with caching get_request_property_chunks()
that should make things less require fewer outbound API calls. Thanks for the suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we needed to introspect the iterable to detect if there were at least two elements
We could also increment a variable to know how many chunks were generated and take action on this. I don't have a preference though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that too, but one problem with that is that is we want to know whether we need to start accumulating records in the merge dictionary on the first chunk, so the incrementing variable only allows us to know on the second loop. So that was why the alternative was a tee()
.
Alternatively, I don't love it, but I would also expect the property list to generally not be that big, so we could convert it to a list and just get the length. but I still lean towards just using this method twice since it should be cached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want to know whether we need to start accumulating records in the merge dictionary on the first chunk
Can we just always accumulate stuff even if we have only one chunk? My reasoning is that I prefer a generic logic that is simpler to understand than memory optimization that should be small because otherwise we would have an issue when there are multiple chunks.
That being said, I'm fine with keeping the current solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that makes sense, I tweaked the design a bit to be agnostic to how many chunks. And also made a few adjustments so that even if we don't get a merge key, we still ensure that we emit the record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
1-1
: Copyright year is set to 2025I noticed the copyright year is set to 2025, which seems to be in the future. Should this be updated to the current year instead? wdyt?
- # Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
52-66
: Consider handling case when a single property exceeds the limitWhat happens if a single property's character length exceeds the property_limit? Currently, it would still be added to a chunk, which might exceed the API's limit. Should there be special handling for this case? Perhaps a warning log or raising an exception? wdyt?
property_field_size = ( len(property_field) if self.property_limit_type == PropertyLimitType.characters else 1 ) + # Handle case where a single property exceeds the limit + if property_field_size > self.property_limit and len(current_chunk) == (len(always_include_properties) if always_include_properties else 0): + logger.warning(f"Property '{property_field}' with size {property_field_size} exceeds the limit of {self.property_limit}") if chunk_size + property_field_size > self.property_limit: yield current_chunk current_chunk = list(always_include_properties) if always_include_properties else [] chunk_size = 0airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
25-30
: Consider validating an empty property_field_path.If
property_field_path
is empty, would it cause unexpected behavior elsewhere? Maybe raise a warning or handle that scenario, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
2-2
: Would you consider referencing this new year in docstrings too?
I see the project year updated here. If this is the only file referencing the year, that might be slightly inconsistent with other placeholders. wdyt?
808-813
: Implementation ofcreate_emit_partial_record
is concise.
Would you consider adding a short docstring or minimal validation (e.g., ifconfig
is empty)? wdyt?
2142-2145
:create_group_by_key
also looks straightforward.
Should we add a quick assertion if the provided key is empty? wdyt?
2886-2927
: Potential concern with in-place modification ofrequest_parameters
.
Might it introduce side effects for subsequent usage? Would a shallow copy be safer? wdyt?
3044-3054
:_remove_query_properties
logic is clean but might need a small guard check.
For example, verifying'type'
is present before comparing to"QueryProperties"
. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
(1 hunks)unit_tests/sources/declarative/test_manifest_declarative_source.py
(0 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/test_manifest_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
🧰 Additional context used
🧬 Code Definitions (3)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
read_records
(440-559)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
GroupByKey
(13-33)get_group_key
(25-33)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
RecordMergeStrategy
(11-19)get_group_key
(18-19)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
Config
(132-133)Config
(146-147)Config
(160-161)Config
(174-175)Config
(192-193)Config
(206-207)Config
(220-221)Config
(234-235)Config
(248-249)Config
(262-263)Config
(276-277)Config
(290-291)Config
(306-307)Config
(320-321)Config
(334-335)Config
(373-374)PropertyLimitType
(1210-1212)PropertyChunking
(1215-1234)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (22)
EmitPartialRecordMergeStrategy
(350-352)GroupByKeyMergeStrategy
(728-736)PropertiesFromEndpoint
(2325-2336)PropertyChunking
(1215-1234)PropertyLimitType
(1210-1212)QueryProperties
(2339-2356)Config
(132-133)Config
(146-147)Config
(160-161)Config
(174-175)Config
(192-193)Config
(206-207)Config
(220-221)Config
(234-235)Config
(248-249)Config
(262-263)Config
(276-277)Config
(290-291)Config
(306-307)Config
(320-321)Config
(334-335)Config
(373-374)airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
PropertiesFromEndpoint
(14-40)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyChunking
(25-69)PropertyLimitType
(14-21)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
QueryProperties
(14-51)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
EmitPartialRecord
(13-23)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
GroupByKey
(13-33)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (19)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (5)
14-22
: LGTM! Well-defined enum for property limit typesThe PropertyLimitType enum clearly defines two approaches for chunking properties - by character count or by property count. The docstring explains the purpose well.
24-36
: Well-structured PropertyChunking classThe PropertyChunking class is well-defined with clear parameters and types. I appreciate the detailed docstring explaining its purpose.
37-40
: Default merge strategy is sensibleUsing GroupByKey with "id" as the default makes sense as a reasonable fallback when no specific merge strategy is provided.
68-69
: LGTM! Clean delegation to the merge strategyThis method cleanly delegates to the underlying merge strategy, maintaining good separation of concerns.
42-51
:❓ Verification inconclusive
Consider handling edge case with empty property lists
The method correctly handles the case when no property limit is set, but what about when both
property_fields
andalways_include_properties
are empty? It would yield an empty list. Is this the expected behavior or should there be a validation check? wdyt?
🏁 Script executed:
#!/bin/bash # Check if there are any tests that verify the behavior with empty lists rg -A 10 -B 5 "def test.*empty.*property" --glob "test_*.py" # Or any tests for the PropertyChunking class rg -A 10 "class Test.*PropertyChunking" --glob "test_*.py"Length of output: 123
Edge Case Handling in
get_request_property_chunks
It looks like when both
property_fields
andalways_include_properties
are empty, the method yields an empty list without any explicit validation. Since our search didn't surface any tests or existing documentation addressing this scenario, could you please confirm whether returning an empty list is indeed the expected behavior or if we should introduce an additional validation check? wdyt?airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (7)
1-2
: License header looks good.No concerns here.
3-5
: Imports from dataclasses and typing look standard.No issues.
6-7
: Confirm pinned dpath version?We’re importing dpath here. Sometimes pinned versions are required to avoid unexpected behavior. Would you consider verifying or pinning the version in your dependencies, wdyt?
8-11
: Imports from internal modules look appropriate.No immediate concerns.
13-19
: Docstring is clear and comprehensive.This docstring nicely explains the usage and intent of dynamically retrieving properties.
20-24
: All fields seem well-defined.Everything is properly typed and complements the docstring description.
31-41
: Potential caching and error handling enhancements for get_properties_from_endpoint.
- If this method is repeatedly fetching properties for each slice, would you consider enabling caching at the
Retriever
level, wdyt?- Also, referencing , a previous reviewer suggested ensuring robust error handling / logging when
dpath.get
path is missing, especially if that indicates unexpected data from the API. Right now, a default[]
is returned, but do we need to raise or log a warning in that scenario, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)
228-230
: No issues spotted in the new import.
It appears neatly added for theEmitPartialRecordMergeStrategyModel
. wdyt?
240-242
: The new import for GroupByKey seems coherent.
Seems like you’ve aligned with the naming from the rest of the code. wdyt?
333-335
: ImportingPropertiesFromEndpointModel
looks straightforward.
Everything appears in order. wdyt?
337-339
: AddingPropertyChunkingModel
also looks good.
No concerns, wdyt?
340-341
: ImportingPropertyLimitTypeModel
is consistent.
No further issues here, wdyt?
342-344
: IntroducingQueryPropertiesModel
.
This import is well-aligned with the rest of your new property management logic. wdyt?
453-464
: All new imports fromquery_properties
seem to align with your approach for property handling.
Would you consider grouping them on fewer lines, or do you prefer the brevity here? wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)
2-2
: Update copyright year to 2023 or 2024?The copyright year is set to 2025, which appears to be in the future. Should this be updated to the current year instead?
-# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
453-463
: Consider extracting property chunk initialization logicThe initialization logic for property chunks could be extracted to a separate method for better readability. This would also simplify the
read_records
method which is becoming quite complex.def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: """ Fetch a stream's records from an HTTP API source :param records_schema: json schema to describe record :param stream_slice: The stream slice to read data for :return: The records read from the API source """ + property_chunks, has_multiple_chunks = self._initialize_property_chunks(stream_slice) + merged_records: MutableMapping[str, Any] = defaultdict(dict) + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check most_recent_record_from_slice = None + # Rest of the method... + +def _initialize_property_chunks(self, stream_slice: Optional[StreamSlice]) -> Tuple[List[List[str]], bool]: if self.additional_query_properties: property_chunks = list( self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ) ) has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice) else: property_chunks = [[""]] has_multiple_chunks = False - merged_records: MutableMapping[str, Any] = defaultdict(dict) - _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + return property_chunks, has_multiple_chunks
464-512
: Consider splitting property chunking logic into a separate methodThe handling of property chunks and record merging makes the method quite complex. Consider extracting this logic into a separate method for better readability and maintainability. This would also make unit testing easier.
if self.additional_query_properties: - for properties in property_chunks: - _slice = StreamSlice( - partition=_slice.partition or {}, - cursor_slice=_slice.cursor_slice or {}, - extra_fields={"query_properties": properties}, - ) # None-check - - record_generator = partial( - self._parse_records, - stream_slice=_slice, - stream_state=self.state or {}, - records_schema=records_schema, - ) - - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) - - # Latest record read, not necessarily within slice boundaries. - # TODO Remove once all custom components implement `observe` method. - # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, current_record, _slice - ) - - # Record merging should only be done if there are multiple property chunks. Otherwise, - # yielding immediately is more efficient so records can be emitted immediately - if ( - has_multiple_chunks - and self.additional_query_properties.property_chunking - and current_record - ): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record - ) - ) - if merge_key: - merged_records[merge_key].update(current_record) - else: - yield stream_data - else: - yield stream_data + most_recent_record_from_slice, merged_records = self._process_property_chunks( + property_chunks, + has_multiple_chunks, + _slice, + most_recent_record_from_slice, + merged_records, + records_schema + ) if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice)Then add a new method:
def _process_property_chunks( self, property_chunks: List[List[str]], has_multiple_chunks: bool, base_slice: StreamSlice, most_recent_record: Optional[Record], merged_records: MutableMapping[str, Any], records_schema: Mapping[str, Any] ) -> Tuple[Optional[Record], MutableMapping[str, Any]]: """ Process each property chunk and handle record merging if needed. Returns: Tuple containing the most recent record and the merged records dictionary """ for properties in property_chunks: _slice = StreamSlice( partition=base_slice.partition or {}, cursor_slice=base_slice.cursor_slice or {}, extra_fields={"query_properties": properties}, ) record_generator = partial( self._parse_records, stream_slice=_slice, stream_state=self.state or {}, records_schema=records_schema, ) for stream_data in self._read_pages(record_generator, self.state, _slice): current_record = self._extract_record(stream_data, _slice) if self.cursor and current_record: self.cursor.observe(_slice, current_record) most_recent_record = self._get_most_recent_record( most_recent_record, current_record, _slice ) # Record merging should only be done if there are multiple property chunks if ( has_multiple_chunks and self.additional_query_properties.property_chunking and current_record ): merge_key = ( self.additional_query_properties.property_chunking.get_merge_key( current_record ) ) if merge_key: merged_records[merge_key].update(current_record) else: yield stream_data else: yield stream_data return most_recent_record, merged_records
599-612
: Use existing method from QueryPropertiesThe
_has_multiple_chunks
method duplicates logic that already exists in theQueryProperties
class. Consider reusing the existing method instead of reimplementing the same logic, wdyt?def _has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: if not self.additional_query_properties: return False - - property_chunks = iter( - self.additional_query_properties.get_request_property_chunks(stream_slice=stream_slice) - ) - try: - next(property_chunks) - next(property_chunks) - return True - except StopIteration: - return False + + return self.additional_query_properties.has_multiple_chunks(stream_slice)
453-557
: Add logging for debugging property chunkingFor operations that involve multiple API requests and potentially complex data merging, adding detailed logging would help with debugging and understanding the flow. Consider adding debug-level logs to track the property chunking and record merging process.
if self.additional_query_properties: + logger.debug(f"Stream {self.name}: Processing property chunks for stream slice {stream_slice}") property_chunks = list( self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ) ) has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice) + logger.debug(f"Stream {self.name}: Found {len(property_chunks)} property chunks. Multiple chunks: {has_multiple_chunks}") else: property_chunks = [[""]] has_multiple_chunks = FalseAnd when merging records:
if merge_key: + logger.debug(f"Stream {self.name}: Merging record with key {merge_key}") merged_records[merge_key].update(current_record) else: + logger.debug(f"Stream {self.name}: No merge key found, yielding record immediately") yield stream_dataunit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
4203-4260
: Good validation of type checking!This test confirms proper error handling when a field expected to be
QueryProperties
is actually a different type. It's a valuable negative test case that helps ensure robust validation.Could we possibly enhance this by checking the specific error message content to make the test even more precise, wdyt?
- with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Expected QueryProperties type but got"): factory.create_component( model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config, )
4262-4345
: Excellent test for multiple query properties validation!This test ensures we correctly reject configurations with multiple
QueryProperties
objects in request parameters, which could lead to ambiguity. Nice error case coverage!Similarly to the previous comment, verifying the specific error message might add an extra layer of clarity to what's being tested:
- with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Multiple QueryProperties"): factory.create_component( model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config, )
4367-4382
: Nice validation error test!The test appropriately verifies that invalid
property_limit_type
values trigger validation errors. This is important for helping users identify configuration issues early.Since this is a pydantic
ValidationError
, would checking the specific validation message be valuable to ensure the error is precisely about the property limit type? For example:- with pytest.raises(ValidationError): + with pytest.raises(ValidationError) as exc_info: connector_builder_factory.create_component( model_type=PropertyChunkingModel, component_definition=property_chunking_model, config={}, ) + assert "property_limit_type" in str(exc_info.value)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(6 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(4 hunks)unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py
- airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
🧰 Additional context used
🧬 Code Definitions (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (3)
QueryProperties
(14-52)get_request_property_chunks
(28-42)has_multiple_chunks
(45-52)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
get_request_property_chunks
(42-66)get_merge_key
(68-69)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5)
PropertyChunking
(1215-1234)PropertiesFromEndpoint
(2325-2336)QueryProperties
(2339-2356)PropertyLimitType
(1210-1212)SimpleRetriever
(2375-2429)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
PropertyChunking
(25-69)PropertyLimitType
(14-21)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
QueryProperties
(14-52)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (10)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (7)
6-6
: LGTM - defaultdict added for record mergingThis import is necessary for the new record merging functionality for property chunking.
16-16
: LGTM - MutableMapping added for type hintingAdding MutableMapping for proper type hinting of the defaultdict.
36-36
: LGTM - Import for QueryPropertiesThis import connects the new query properties functionality to the SimpleRetriever.
94-94
: LGTM - QueryProperties attribute addedThis new attribute will store the query properties configuration, if any, and is correctly defined as Optional.
515-519
: LGTM - Record merging after processing chunksThis logic correctly yields the merged records after all chunks have been processed. The
merged_records
dictionary collects records with the same merge key, and this code yields these combined records.
520-557
: LGTM - Original non-chunking approach preservedThe original approach for handling records without property chunking is correctly preserved with minimal modifications. This maintains backward compatibility with existing connectors.
462-462
: Why use [""] as default property chunk?When additional_query_properties is None, you're using
[[""]]
as the default property_chunks. Consider using an empty list[[]]
instead, as an empty string property might be interpreted differently by some APIs than no property at all. What do you think?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
4010-4096
: Nice implementation of the query properties test!The test effectively validates that a
SimpleRetriever
correctly processes static query properties, along with property chunking and merge strategy configuration. It's particularly good to see the verification that fields defined inrequest_parameters.query_properties
are correctly placed underSimpleRetriever.additional_query_properties
.Would adding a docstring to clarify the purpose of this test be helpful for future developers, wdyt?
4106-4202
: Great test for dynamic properties from endpoint!This thoroughly validates the
PropertiesFromEndpoint
functionality, including proper configuration of the nested retriever for fetching properties. The test verifies all the important aspects of the dynamic property retrieval feature.
4347-4365
: Good test for character-based property chunking!This test validates that a
PropertyChunking
instance with character-based limits is created correctly. It verifies both the chunking type and limit value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2332-2350
: QueryProperties ties everything together nicely.This class provides a good abstraction for handling sets of properties in API requests. I like how it supports both static lists and dynamic properties from endpoints through the union type. The optional
always_include_properties
is a nice touch for ensuring critical fields are always present. Theproperty_chunking
integration completes the picture by allowing the properties to be split across multiple requests when needed.Have you considered adding some validation to ensure that properties in
always_include_properties
also exist in the main property list? This might prevent potential runtime errors, wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (4)
1756-1779
: GroupByKeyMergeStrategy Definition Review
The newGroupByKeyMergeStrategy
component is well structured and adheres to our schema patterns. I wonder if it might be helpful to include a few more concrete examples—especially when grouping by composite keys—to guide users on how to structure thekey
field. wdyt?
3019-3048
: PropertiesFromEndpoint Component Review
ThePropertiesFromEndpoint
component is clearly defined and follows our established pattern. The use of ananyOf
for theretriever
field seems flexible enough. Would it be beneficial to add additional examples showing how theproperty_field_path
maps to API response fields, just to further clarify its behavior to users? wdyt?
3049-3076
: PropertyChunking Component Evaluation
ThePropertyChunking
component is defined clearly and distinguishes between limit types (characters
vs.property_count
). Do you think it might be useful to add some examples illustrating how theproperty_limit
is interpreted under each limit type? Also, should we consider whether therecord_merge_strategy
might need to support more than justGroupByKeyMergeStrategy
in future iterations? wdyt?
3077-3108
: QueryProperties Component Review
TheQueryProperties
component elegantly accommodates both static and dynamic definitions for the property list and integrates the property chunking configuration seamlessly. Would it be worth adding a few additional examples—perhaps one that demonstrates dynamic property retrieval usingPropertiesFromEndpoint
—to ensure clarity for users? Also, do we need any extra validation foralways_include_properties
? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(4 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(15 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- airbyte_cdk/sources/declarative/requesters/query_properties/strategies/init.py
- airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
723-731
: The GroupByKeyMergeStrategy looks well-defined for merging properties.This new class looks good - it provides the mechanism to merge records from multiple API requests based on a common key value. The key field can be a string or list of strings (for composite keys), which offers good flexibility. The description is clear about its purpose too.
1205-1227
: The property chunking functionality looks well-implemented.The new
PropertyLimitType
enum andPropertyChunking
class provide a clean way to handle APIs that limit the number of properties in requests. The enum offers two sensible limit types (character count or property count), and thePropertyChunking
class ties together the limit configuration with a merge strategy. This approach should work well for breaking requests into smaller pieces while keeping the code clean and maintainable.
2228-2228
: Improved type flexibility for request parameters looks good.The type annotation change from
Dict[str, str]
toDict[str, Union[str, Any]]
allows for more complex parameter values, which is necessary for the new query properties functionality. This change maintains backward compatibility while providing the flexibility needed for the new features.
2318-2330
: PropertiesFromEndpoint is well-structured for dynamic property retrieval.This new class enables fetching properties from a remote API endpoint rather than hardcoding them. It's a nice abstraction that correctly requires both the path to extract the properties and the retriever to make the request. This will enable connectors like
source-hubspot
to dynamically retrieve available properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
462-463
: Data structures for record tracking and merging.Using a defaultdict for merged records is an elegant solution. Have you considered adding a comment explaining the structure and purpose of these collections to help future maintainers understand the pattern more quickly? wdyt?
-records_without_merge_key = [] -merged_records: MutableMapping[str, Any] = defaultdict(dict) +# Store records without merge keys for later processing +records_without_merge_key = [] +# Use defaultdict to group records by merge key for efficient merging +merged_records: MutableMapping[str, Any] = defaultdict(dict)
495-507
: Record merging logic based on property chunks.This is the core of the new functionality - merging records based on chunk processing. The code handles both cases with and without merge keys appropriately. However, should we add a debug log when a record doesn't have a merge key to help troubleshoot potential issues? wdyt?
else: # We should still emit records even if the record did not have a merge key + # Consider adding debug logging here if implementing logging records_without_merge_key.append(current_record)
453-555
: Consider adding method documentation for the enhanced functionality.The read_records method has been significantly enhanced, but the docstring wasn't updated to reflect these changes. Would it be helpful to expand the documentation to explain the property chunking and merging behavior for future developers? wdyt?
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: """ Fetch a stream's records from an HTTP API source :param records_schema: json schema to describe record :param stream_slice: The stream slice to read data for :return: The records read from the API source + + When additional_query_properties is set, the method will: + 1. Retrieve property chunks from the query properties + 2. Process each chunk by creating a new stream slice with the properties + 3. Merge records based on a merge key if property_chunking is defined + 4. Yield both merged records and records without merge keys """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(5 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)
QueryProperties
(14-58)get_request_property_chunks
(28-48)airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1)
get_request_property_chunks
(42-66)airbyte_cdk/sources/types.py (7)
StreamSlice
(66-160)partition
(90-95)cursor_slice
(98-103)extra_fields
(106-108)data
(34-35)values
(134-135)get
(137-138)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (8)
6-6
: Adding defaultdict import for property merging.The code now utilizes
defaultdict
to simplify record merging when processing property chunks. This is a good choice for the implementation, providing a clean way to aggregate records by key.
36-36
: New import for QueryProperties functionality.The addition of the QueryProperties import supports the new feature for property chunking and dynamic property retrieval from API endpoints. This aligns well with the PR's objective of enhancing low-code CDK.
94-94
: New field added for query properties support.The
additional_query_properties
attribute enables the handling of query properties in the SimpleRetriever. This matches the described functionality in the PR summary where properties can be defined or fetched from endpoints.
453-461
: Property chunking initialization looks good.This code fetches property chunks from the additional_query_properties if available. The approach allows for breaking down properties into smaller chunks to accommodate API limitations, which is a key requirement mentioned in the PR objectives.
468-475
: Stream slice creation with query properties.The code builds a new stream slice that includes the query properties for each chunk. This approach maintains the original stream slice data while adding the specific properties for the current request.
476-493
: Record processing setup and cursor handling.The implementation preserves cursor functionality while incorporating the new property chunking feature. Good attention to detail in maintaining existing behavior for record tracking and cursor observation.
511-517
: Yielding merged and non-merge key records.The implementation correctly yields both types of records after processing all property chunks. This ensures we don't lose any data regardless of whether records could be merged or not.
518-555
: Maintaining backward compatibility.The code maintains the original behavior when no additional query properties are specified. This is excellent for backward compatibility and ensures existing connectors continue to work unchanged.
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/12145
What
Adds support for the
QueryProperties
concept which allows for low-code connectors to inject either a static list of properties (source-linkedin-ads) or a set of properties retrieved from an API endpoint (source-hubspot
) into theHttpRequester.request_properties
See the spec for more details about what this feature is and how it is used by connectors:
https://docs.google.com/document/d/1B_WcIEHTzctwHX_QjZgONXVF9g0i7BwpKzwrHMIA59s/edit?tab=t.0#heading=h.5dmjlrjauglv
How
This PR has all the usual things associated with new components being added to the low-code framework. That includes updating
declarative_component_schema.yaml
, regenerating pydantic models, and updatingmodel_to_component_factory.py
to parse the models into runtime components.The part I want to call out around model parsing is that we do something of a unique (and maybe a bit confusing) transformation of the
HttpRequester.request_parameters.query_properties
model definition by moving it under theSimpleRetriever.additional_query_properties
for runtime. The optimal developer experience is to define it underrequest_parameters
because that is ultimately where it gets injected. However, the SimpleRetriever needs to have context into theQueryProperties
component because it orchestrates the requests being made and merging records back together.At runtime, PR can be broken down into the two pieces of functionality mentioned above
Defining or retrieving the complete set of request properties to inject:
The
QueryProperties
runtime class acts as the orchestrator behind getting the list of properties that need to be queried for either through the statically defined list or thePropertiesFromEndpoint
class. It then leveragesPropertyChunking
if defined, otherwise it returns a single set of properties. This way, allQueryProperty
output is processed in the same way by theSimpleRetriever
regardless of whether property chunking is defined.Some connectors like linkedin ads have a unique identifier based on property fields that must be requested and therefore they must be included in every grouping of properties. The
always_include_properties
field allows us to define fields we know must be added. And this allows us to avoid using interpolation to build a complex query field such as:fields: "{{ ','.join(['required', stream_partition.extra_fields['query_properties']) }}"
Performing chunking into smaller sets of properties for APIs with restrictions around requests:
The
PropertyChunking
runtime class is responsible for dictating when to break the complete set of components into smaller groups either by the character count or by the number of properties per group. It also supplies therecord_merge_strategy
which would only be needed when there are multiple groupsLoading the
query_properties
into the the outbound API requestThis is probably one of the more awkward parts of the code. We need some mechanism of passing the current chunk of
query_properties
to inject from the SimpleRetriever.read_recordsto the
InterpolatedRequestOptionsProvider. The most convenient way to avoid changing the interface is to use
extra_properties` which won't be persisted back as state.Even though jinja interpolation could work and might be more consistent so that every request_param is a string, it feels backwards to convert things back into interpolated strings when we can just operate on the
StreamSlice
directlySummary by CodeRabbit
New Features
Tests
SimpleRetriever
class.