From 392098b36c1cab2cad999849c38e30c319251eb8 Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Sat, 15 Mar 2025 12:17:04 -0400 Subject: [PATCH] Revert "estuary-cdk: return a tuple of headers & a function to get the response body" This reverts commit 33e17bb8fc825c8b82529dc396fa4990ae421636. --- estuary-cdk/estuary_cdk/http.py | 78 ++++++------------- source-gladly/source_gladly/api.py | 3 +- .../api.py | 4 +- .../source_google_sheets_native/api.py | 4 +- .../source_shopify_native/api.py | 4 +- .../source_zendesk_support_native/api.py | 6 +- 6 files changed, 32 insertions(+), 67 deletions(-) diff --git a/estuary-cdk/estuary_cdk/http.py b/estuary-cdk/estuary_cdk/http.py index f2ebf1c580..b688f4eb23 100644 --- a/estuary-cdk/estuary_cdk/http.py +++ b/estuary-cdk/estuary_cdk/http.py @@ -2,7 +2,7 @@ from logging import Logger from estuary_cdk.incremental_json_processor import Remainder from pydantic import BaseModel -from typing import AsyncGenerator, Any, TypeVar, Union, Callable +from typing import AsyncGenerator, Any, TypeVar import abc import aiohttp import asyncio @@ -25,13 +25,6 @@ StreamedObject = TypeVar("StreamedObject", bound=BaseModel) -class Headers(dict[str, Any]): - pass - - -BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]] -HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction] - class HTTPError(RuntimeError): """ @@ -76,11 +69,9 @@ async def request( """Request a url and return its body as bytes""" chunks: list[bytes] = [] - _, body_generator = await self._request_stream( + async for chunk in self._request_stream( log, url, method, params, json, form, _with_token, headers - ) - - async for chunk in body_generator(): + ): chunks.append(chunk) if len(chunks) == 0: @@ -99,26 +90,22 @@ async def request_lines( json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, delim: bytes = b"\n", - headers: dict[str, Any] = {} - ) -> tuple[Headers, BodyGeneratorFunction]: + ) -> AsyncGenerator[bytes, None]: """Request a url and return its response as streaming lines, as they arrive""" - headers, body = await self._request_stream( - log, url, method, params, json, form, True, headers - ) - - async def gen() -> AsyncGenerator[bytes, None]: - buffer = b"" - async for chunk in body(): - buffer += chunk - while delim in buffer: - line, buffer = buffer.split(delim, 1) - yield line + buffer = b"" + async for chunk in self._request_stream( + log, url, method, params, json, form, True + ): + buffer += chunk + while delim in buffer: + line, buffer = buffer.split(delim, 1) + yield line - if buffer: - yield buffer + if buffer: + yield buffer - return (headers, gen) + return async def request_stream( self, @@ -128,15 +115,13 @@ async def request_stream( params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, form: dict[str, Any] | None = None, - headers: dict[str, Any] = {}, - ) -> tuple[Headers, BodyGeneratorFunction]: + ) -> AsyncGenerator[bytes, None]: """Request a url and and return the raw response as a stream of bytes""" - headers, body = await self._request_stream(log, url, method, params, json, form, True, headers) - return (headers, body) + return self._request_stream(log, url, method, params, json, form, True) @abc.abstractmethod - async def _request_stream( + def _request_stream( self, log: Logger, url: str, @@ -146,7 +131,7 @@ async def _request_stream( form: dict[str, Any] | None, _with_token: bool, headers: dict[str, Any] = {}, - ) -> HeadersAndBodyGenerator: ... + ) -> AsyncGenerator[bytes, None]: ... # TODO(johnny): This is an unstable API. # It may need to accept request headers, or surface response headers, @@ -330,7 +315,7 @@ async def _request_stream( form: dict[str, Any] | None, _with_token: bool, headers: dict[str, Any] = {}, - ) -> HeadersAndBodyGenerator: + ) -> AsyncGenerator[bytes, None]: while True: cur_delay = self.rate_limiter.delay await asyncio.sleep(cur_delay) @@ -345,17 +330,14 @@ async def _request_stream( ) headers[self.token_source.authorization_header] = header_value - resp = await self.inner.request( + async with self.inner.request( headers=headers, json=json, data=form, method=method, params=params, url=url, - ) - - should_release_response = True - try: + ) as resp: self.rate_limiter.update(cur_delay, resp.status == 429) if resp.status == 429: @@ -384,17 +366,7 @@ async def _request_stream( else: resp.raise_for_status() - async def body_generator() -> AsyncGenerator[bytes, None]: - try: - async for chunk in resp.content.iter_any(): - yield chunk - finally: - await resp.release() - - headers = Headers({k: v for k, v in resp.headers.items()}) - should_release_response = False - return (headers, body_generator) + async for chunk in resp.content.iter_any(): + yield chunk - finally: - if should_release_response: - await resp.release() + return diff --git a/source-gladly/source_gladly/api.py b/source-gladly/source_gladly/api.py index 7e4cfb4744..2d625732e0 100644 --- a/source-gladly/source_gladly/api.py +++ b/source-gladly/source_gladly/api.py @@ -33,8 +33,7 @@ async def fetch_events( last_ts = log_cursor doc_count = 0 - _, lines = await http.request_lines(log, url, params=params) - async for line in lines(): + async for line in http.request_lines(log, url, params=params): event = Event.model_validate_json(line) if event.timestamp < last_ts: # Events must be in ascending order with respect to time, so this is an application diff --git a/source-google-analytics-data-api-native/source_google_analytics_data_api_native/api.py b/source-google-analytics-data-api-native/source_google_analytics_data_api_native/api.py index 4dc02895a6..d519c9a86b 100644 --- a/source-google-analytics-data-api-native/source_google_analytics_data_api_native/api.py +++ b/source-google-analytics-data-api-native/source_google_analytics_data_api_native/api.py @@ -128,10 +128,8 @@ async def _paginate_through_report_results( while True: body = _build_report_body(date, report, offset) - _, response_body = await http.request_stream(log, url, method="POST", json=body) - processor = IncrementalJsonProcessor( - response_body(), + await http.request_stream(log, url, method="POST", json=body), f"rows.item", Row, RunReportResponse, diff --git a/source-google-sheets-native/source_google_sheets_native/api.py b/source-google-sheets-native/source_google_sheets_native/api.py index 457c1308f4..a430614bad 100644 --- a/source-google-sheets-native/source_google_sheets_native/api.py +++ b/source-google-sheets-native/source_google_sheets_native/api.py @@ -58,10 +58,8 @@ async def fetch_rows( params["fields"] = "sheets.data(rowData.values(effectiveFormat(numberFormat(type)),effectiveValue))" - _, body = await http.request_stream(log, url, params=params) - async for row in IncrementalJsonProcessor( - body(), + await http.request_stream(log, url, params=params), "sheets.item.data.item.rowData.item", RowData, ): diff --git a/source-shopify-native/source_shopify_native/api.py b/source-shopify-native/source_shopify_native/api.py index ca0b8f51f7..a964cc294b 100644 --- a/source-shopify-native/source_shopify_native/api.py +++ b/source-shopify-native/source_shopify_native/api.py @@ -34,8 +34,8 @@ async def fetch_products( last_seen_dt = log_cursor - _, lines = await http.request_lines(log, url) - async for record in products.process_result(log, lines()): + lines = http.request_lines(log, url) + async for record in products.process_result(log, lines): product = TimestampedResource.model_validate(record) if product.updatedAt > last_seen_dt: diff --git a/source-zendesk-support-native/source_zendesk_support_native/api.py b/source-zendesk-support-native/source_zendesk_support_native/api.py index 2a54b2f4f8..1c2e0dfacc 100644 --- a/source-zendesk-support-native/source_zendesk_support_native/api.py +++ b/source-zendesk-support-native/source_zendesk_support_native/api.py @@ -440,9 +440,8 @@ async def _fetch_incremental_time_export_resources( while True: async with incremental_time_export_api_lock: - _, body = await http.request_stream(log, url, params=params) processor = IncrementalJsonProcessor( - body(), + await http.request_stream(log, url, params=params), f"{name}.item", TimestampedResource, response_model, @@ -569,9 +568,8 @@ async def _fetch_incremental_cursor_export_resources( params["cursor"] = _base64_encode(cursor) while True: - _, body = await http.request_stream(log, url, params=params) processor = IncrementalJsonProcessor( - body(), + await http.request_stream(log, url, params=params), f"{name}.item", TimestampedResource, response_model,