Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

estuary-cdk: revert 33e17bb8fc825c8b82529dc396fa4990ae421636 #2538

Merged
merged 1 commit into from
Mar 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 25 additions & 53 deletions estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from logging import Logger
from estuary_cdk.incremental_json_processor import Remainder
from pydantic import BaseModel
from typing import AsyncGenerator, Any, TypeVar, Union, Callable
from typing import AsyncGenerator, Any, TypeVar
import abc
import aiohttp
import asyncio
Expand All @@ -25,13 +25,6 @@

StreamedObject = TypeVar("StreamedObject", bound=BaseModel)

class Headers(dict[str, Any]):
pass


BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]]
HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction]


class HTTPError(RuntimeError):
"""
Expand Down Expand Up @@ -76,11 +69,9 @@ async def request(
"""Request a url and return its body as bytes"""

chunks: list[bytes] = []
_, body_generator = await self._request_stream(
async for chunk in self._request_stream(
log, url, method, params, json, form, _with_token, headers
)

async for chunk in body_generator():
):
chunks.append(chunk)

if len(chunks) == 0:
Expand All @@ -99,26 +90,22 @@ async def request_lines(
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
delim: bytes = b"\n",
headers: dict[str, Any] = {}
) -> tuple[Headers, BodyGeneratorFunction]:
) -> AsyncGenerator[bytes, None]:
"""Request a url and return its response as streaming lines, as they arrive"""

headers, body = await self._request_stream(
log, url, method, params, json, form, True, headers
)

async def gen() -> AsyncGenerator[bytes, None]:
buffer = b""
async for chunk in body():
buffer += chunk
while delim in buffer:
line, buffer = buffer.split(delim, 1)
yield line
buffer = b""
async for chunk in self._request_stream(
log, url, method, params, json, form, True
):
buffer += chunk
while delim in buffer:
line, buffer = buffer.split(delim, 1)
yield line

if buffer:
yield buffer
if buffer:
yield buffer

return (headers, gen)
return

async def request_stream(
self,
Expand All @@ -128,15 +115,13 @@ async def request_stream(
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
headers: dict[str, Any] = {},
) -> tuple[Headers, BodyGeneratorFunction]:
) -> AsyncGenerator[bytes, None]:
"""Request a url and and return the raw response as a stream of bytes"""

headers, body = await self._request_stream(log, url, method, params, json, form, True, headers)
return (headers, body)
return self._request_stream(log, url, method, params, json, form, True)

@abc.abstractmethod
async def _request_stream(
def _request_stream(
self,
log: Logger,
url: str,
Expand All @@ -146,7 +131,7 @@ async def _request_stream(
form: dict[str, Any] | None,
_with_token: bool,
headers: dict[str, Any] = {},
) -> HeadersAndBodyGenerator: ...
) -> AsyncGenerator[bytes, None]: ...

# TODO(johnny): This is an unstable API.
# It may need to accept request headers, or surface response headers,
Expand Down Expand Up @@ -330,7 +315,7 @@ async def _request_stream(
form: dict[str, Any] | None,
_with_token: bool,
headers: dict[str, Any] = {},
) -> HeadersAndBodyGenerator:
) -> AsyncGenerator[bytes, None]:
while True:
cur_delay = self.rate_limiter.delay
await asyncio.sleep(cur_delay)
Expand All @@ -345,17 +330,14 @@ async def _request_stream(
)
headers[self.token_source.authorization_header] = header_value

resp = await self.inner.request(
async with self.inner.request(
headers=headers,
json=json,
data=form,
method=method,
params=params,
url=url,
)

should_release_response = True
try:
) as resp:
self.rate_limiter.update(cur_delay, resp.status == 429)

if resp.status == 429:
Expand Down Expand Up @@ -384,17 +366,7 @@ async def _request_stream(
else:
resp.raise_for_status()

async def body_generator() -> AsyncGenerator[bytes, None]:
try:
async for chunk in resp.content.iter_any():
yield chunk
finally:
await resp.release()

headers = Headers({k: v for k, v in resp.headers.items()})
should_release_response = False
return (headers, body_generator)
async for chunk in resp.content.iter_any():
yield chunk

finally:
if should_release_response:
await resp.release()
return
3 changes: 1 addition & 2 deletions source-gladly/source_gladly/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ async def fetch_events(

last_ts = log_cursor
doc_count = 0
_, lines = await http.request_lines(log, url, params=params)
async for line in lines():
async for line in http.request_lines(log, url, params=params):
event = Event.model_validate_json(line)
if event.timestamp < last_ts:
# Events must be in ascending order with respect to time, so this is an application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ async def _paginate_through_report_results(
while True:
body = _build_report_body(date, report, offset)

_, response_body = await http.request_stream(log, url, method="POST", json=body)

processor = IncrementalJsonProcessor(
response_body(),
await http.request_stream(log, url, method="POST", json=body),
f"rows.item",
Row,
RunReportResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ async def fetch_rows(

params["fields"] = "sheets.data(rowData.values(effectiveFormat(numberFormat(type)),effectiveValue))"

_, body = await http.request_stream(log, url, params=params)

async for row in IncrementalJsonProcessor(
body(),
await http.request_stream(log, url, params=params),
"sheets.item.data.item.rowData.item",
RowData,
):
Expand Down
4 changes: 2 additions & 2 deletions source-shopify-native/source_shopify_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ async def fetch_products(

last_seen_dt = log_cursor

_, lines = await http.request_lines(log, url)
async for record in products.process_result(log, lines()):
lines = http.request_lines(log, url)
async for record in products.process_result(log, lines):
product = TimestampedResource.model_validate(record)

if product.updatedAt > last_seen_dt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,8 @@ async def _fetch_incremental_time_export_resources(

while True:
async with incremental_time_export_api_lock:
_, body = await http.request_stream(log, url, params=params)
processor = IncrementalJsonProcessor(
body(),
await http.request_stream(log, url, params=params),
f"{name}.item",
TimestampedResource,
response_model,
Expand Down Expand Up @@ -569,9 +568,8 @@ async def _fetch_incremental_cursor_export_resources(
params["cursor"] = _base64_encode(cursor)

while True:
_, body = await http.request_stream(log, url, params=params)
processor = IncrementalJsonProcessor(
body(),
await http.request_stream(log, url, params=params),
f"{name}.item",
TimestampedResource,
response_model,
Expand Down
Loading