Skip to content

Commit

Permalink
source-zendesk-support-native: reduce memory usage by streaming incre…
Browse files Browse the repository at this point in the history
…mental export resources

Using the new `http.request_object_stream` to stream incremental export
resources signficantly reduces the memory usage of the connector, even
when using the max page size of 1000.
  • Loading branch information
Alex-Bair committed Jan 30, 2025
1 parent 4bab06f commit 4d8184f
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 14 deletions.
104 changes: 104 additions & 0 deletions source-zendesk-support-native/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 7 additions & 14 deletions source-zendesk-support-native/source_zendesk_support_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,21 @@ async def backfill_satisfaction_ratings(
yield end


async def _make_incremental_cursor_export_request(
async def _fetch_top_level_fields(
http: HTTPSession,
url: str,
params: dict[str, str | int],
response_model: type[IncrementalCursorExportResponse],
log: Logger,
) -> IncrementalCursorExportResponse:
) -> tuple[str | None, bool]:
# Instead of using Pydantic's model_validate_json that uses json.loads internally,
# use json.JSONDecoder().raw_decode to reduce memory overhead when processing the response.
raw_response_bytes = await http.request(log, url, params=params)
obj, _ = json.JSONDecoder().raw_decode(raw_response_bytes.decode('utf-8'))
# model_construct is used to avoid validating & transforming all resources within large response bodies at once
# to reduce memory overhead. Instead, resources are validated & transformed one-by-one as they are yielded.
return response_model.model_construct(**obj)
response = response_model.model_construct(**obj)
return (response.after_cursor, response.end_of_stream)


async def _fetch_incremental_cursor_export_resources(
Expand Down Expand Up @@ -360,21 +361,13 @@ async def _fetch_incremental_cursor_export_resources(
params["cursor"] = _base64_encode(cursor)

while True:
response = await _make_incremental_cursor_export_request(http, url, params, response_model, log)

next_page_cursor = response.after_cursor
end_of_stream = response.end_of_stream

next_page_cursor = response.after_cursor
end_of_stream = response.end_of_stream
next_page_cursor, end_of_stream = await _fetch_top_level_fields(http, url, params, response_model, log)

if next_page_cursor is None:
return

for resource in response.resources:
# Since _make_incremental_cursor_export_request does not validate & transform all resources at once,
# we validate them one-by-one as they're yielded.
yield TimestampedResource.model_validate(resource)
async for resource in http.request_object_stream(log, TimestampedResource, f"{name}.item", url, params=params):
yield resource

yield _base64_decode(next_page_cursor)

Expand Down

0 comments on commit 4d8184f

Please sign in to comment.