Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(low-code cdk): add type and parameters resolving for dynamic streams #439

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ def _dynamic_stream_configs(
for dynamic_stream in components_resolver.resolve_components(
stream_template_config=stream_template_config
):
dynamic_stream = {
**ManifestComponentTransformer().propagate_types_and_parameters(
"", dynamic_stream, {}, use_parent_parameters=True
)
}

if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import copy
import typing
from typing import Any, Mapping
from typing import Any, Mapping, Optional

PARAMETERS_STR = "$parameters"

Expand Down Expand Up @@ -94,6 +94,7 @@ def propagate_types_and_parameters(
parent_field_identifier: str,
declarative_component: Mapping[str, Any],
parent_parameters: Mapping[str, Any],
use_parent_parameters: Optional[bool] = None,
) -> Mapping[str, Any]:
"""
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
Expand All @@ -103,6 +104,7 @@ def propagate_types_and_parameters(
:param declarative_component: The current component that is having type and parameters added
:param parent_field_identifier: The name of the field of the current component coming from the parent component
:param parent_parameters: The parameters set on parent components defined before the current component
:param use_parent_parameters: If set, parent parameters will be used as the source of truth when key names are the same
:return: A deep copy of the transformed component with types and parameters persisted to it
"""
propagated_component = dict(copy.deepcopy(declarative_component))
Expand Down Expand Up @@ -130,7 +132,11 @@ def propagate_types_and_parameters(
# level take precedence
current_parameters = dict(copy.deepcopy(parent_parameters))
component_parameters = propagated_component.pop(PARAMETERS_STR, {})
current_parameters = {**current_parameters, **component_parameters}
current_parameters = (
{**component_parameters, **current_parameters}
if use_parent_parameters
else {**current_parameters, **component_parameters}
)

# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
# both exist
Expand All @@ -145,7 +151,10 @@ def propagate_types_and_parameters(
excluded_parameter = current_parameters.pop(field_name, None)
parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
propagated_component[field_name] = self.propagate_types_and_parameters(
parent_type_field_identifier, field_value, current_parameters
parent_type_field_identifier,
field_value,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
if excluded_parameter:
current_parameters[field_name] = excluded_parameter
Expand All @@ -158,7 +167,10 @@ def propagate_types_and_parameters(
f"{propagated_component.get('type')}.{field_name}"
)
field_value[i] = self.propagate_types_and_parameters(
parent_type_field_identifier, element, current_parameters
parent_type_field_identifier,
element,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
if excluded_parameter:
current_parameters[field_name] = excluded_parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def to_configured_catalog(
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"$parameters": {
"name": "",
},
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
Expand Down Expand Up @@ -145,7 +147,9 @@ def to_configured_catalog(
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"$parameters": {
"name": "",
},
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
Expand Down Expand Up @@ -231,7 +235,9 @@ def to_configured_catalog(
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"$parameters": {
"name": "",
},
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
Expand Down Expand Up @@ -331,7 +337,7 @@ def to_configured_catalog(
"components_mapping": [
{
"type": "ComponentMappingDefinition",
"field_path": ["name"],
"field_path": ["$parameters", "name"],
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
},
{
Expand Down Expand Up @@ -563,6 +569,10 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
catalog=None,
state=None,
)
dynamic_streams = source._dynamic_stream_configs(source.resolved_manifest, _CONFIG)

assert len(dynamic_streams) == 4
assert dynamic_streams[0]["retriever"]["name"] == "parent_1_item_1"

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

Expand Down
Loading