diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 23d41b174..ae0fdcf6e 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -387,6 +387,12 @@ def _dynamic_stream_configs( for dynamic_stream in components_resolver.resolve_components( stream_template_config=stream_template_config ): + dynamic_stream = { + **ManifestComponentTransformer().propagate_types_and_parameters( + "", dynamic_stream, {}, use_parent_parameters=True + ) + } + if "type" not in dynamic_stream: dynamic_stream["type"] = "DeclarativeStream" diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index f2719bb14..6779b54ab 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -4,7 +4,7 @@ import copy import typing -from typing import Any, Mapping +from typing import Any, Mapping, Optional PARAMETERS_STR = "$parameters" @@ -94,6 +94,7 @@ def propagate_types_and_parameters( parent_field_identifier: str, declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], + use_parent_parameters: Optional[bool] = None, ) -> Mapping[str, Any]: """ Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the @@ -103,6 +104,7 @@ def propagate_types_and_parameters( :param declarative_component: The current component that is having type and parameters added :param parent_field_identifier: The name of the field of the current component coming from the parent component :param parent_parameters: The parameters set on parent components defined before the current component + :param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same :return: A deep copy of the transformed component with types and parameters persisted to it """ propagated_component = dict(copy.deepcopy(declarative_component)) @@ -130,7 +132,11 @@ def propagate_types_and_parameters( # level take precedence current_parameters = dict(copy.deepcopy(parent_parameters)) component_parameters = propagated_component.pop(PARAMETERS_STR, {}) - current_parameters = {**current_parameters, **component_parameters} + current_parameters = ( + {**component_parameters, **current_parameters} + if use_parent_parameters + else {**current_parameters, **component_parameters} + ) # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if # both exist @@ -145,7 +151,10 @@ def propagate_types_and_parameters( excluded_parameter = current_parameters.pop(field_name, None) parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" propagated_component[field_name] = self.propagate_types_and_parameters( - parent_type_field_identifier, field_value, current_parameters + parent_type_field_identifier, + field_value, + current_parameters, + use_parent_parameters=use_parent_parameters, ) if excluded_parameter: current_parameters[field_name] = excluded_parameter @@ -158,7 +167,10 @@ def propagate_types_and_parameters( f"{propagated_component.get('type')}.{field_name}" ) field_value[i] = self.propagate_types_and_parameters( - parent_type_field_identifier, element, current_parameters + parent_type_field_identifier, + element, + current_parameters, + use_parent_parameters=use_parent_parameters, ) if excluded_parameter: current_parameters[field_name] = excluded_parameter diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 09d069bff..e94f007e2 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -59,7 +59,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -145,7 +147,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -231,7 +235,9 @@ def to_configured_catalog( "type": "DynamicDeclarativeStream", "stream_template": { "type": "DeclarativeStream", - "name": "", + "$parameters": { + "name": "", + }, "primary_key": [], "schema_loader": { "type": "InlineSchemaLoader", @@ -331,7 +337,7 @@ def to_configured_catalog( "components_mapping": [ { "type": "ComponentMappingDefinition", - "field_path": ["name"], + "field_path": ["$parameters", "name"], "value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}", }, { @@ -563,6 +569,10 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str catalog=None, state=None, ) + dynamic_streams = source._dynamic_stream_configs(source.resolved_manifest, _CONFIG) + + assert len(dynamic_streams) == 4 + assert dynamic_streams[0]["retriever"]["name"] == "parent_1_item_1" actual_catalog = source.discover(logger=source.logger, config=_CONFIG)