Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def rest_api_resources(config: RESTAPIConfig):
request_params = endpoint_config.get("params", {})
resource_config = endpoint_config.get("resource", {})

include_from_parent: list[str] = resource_config.pop("include_from_parent", [])

incremental_object, incremental_param = setup_incremental_object(
request_params, endpoint_config.get("incremental")
)
Expand Down Expand Up @@ -235,13 +237,25 @@ def paginate_dependent_resource(
for item in items:
formatted_path = path.format(**{param_name: item[field_path]})

yield from client.paginate(
child_results = client.paginate(
method=method,
path=formatted_path,
params=params,
paginator=paginator,
)

parent_resource_name = resolved_param.resolve_config.resource_name
for r in child_results:
if r:
yield _add_from_parent(
r,
item,
include_from_parent,
parent_resource_name,
)
else:
yield r

resources[resource_name] = dlt.resource(
paginate_dependent_resource,
name=resource_name,
Expand All @@ -257,6 +271,22 @@ def paginate_dependent_resource(
return list(resources.values())


def _add_from_parent(
child_records,
parent_record,
include_from_parent,
parent_resource_name,
):
"""allows dependent resource to include parent resource values
which are not in the response of the child resource"""
for child in child_records:
for parent_field in include_from_parent:
field_from_parent = f"_{parent_resource_name}_{parent_field}"
if field_from_parent not in child:
child[field_from_parent] = parent_record[parent_field]
return child_records


#
# Alternative implementation
#
Expand Down Expand Up @@ -429,4 +459,3 @@ def paginate_dependent_resource(
)

return list(dlt_resources.values())