Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

estuary-cdk: re-add support for surfacing headers #2542

Merged
merged 1 commit into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from logging import Logger
from estuary_cdk.incremental_json_processor import Remainder
from pydantic import BaseModel
from typing import AsyncGenerator, Any, TypeVar
from typing import AsyncGenerator, Any, TypeVar, Union, Callable
import abc
import aiohttp
import asyncio
Expand All @@ -25,6 +25,13 @@

StreamedObject = TypeVar("StreamedObject", bound=BaseModel)

class Headers(dict[str, Any]):
pass


BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]]
HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction]


class HTTPError(RuntimeError):
"""
Expand Down Expand Up @@ -69,9 +76,11 @@ async def request(
"""Request a url and return its body as bytes"""

chunks: list[bytes] = []
async for chunk in self._request_stream(
_, body_generator = await self._request_stream(
log, url, method, params, json, form, _with_token, headers
):
)

async for chunk in body_generator():
chunks.append(chunk)

if len(chunks) == 0:
Expand All @@ -90,22 +99,26 @@ async def request_lines(
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
delim: bytes = b"\n",
) -> AsyncGenerator[bytes, None]:
headers: dict[str, Any] = {}
) -> tuple[Headers, BodyGeneratorFunction]:
"""Request a url and return its response as streaming lines, as they arrive"""

buffer = b""
async for chunk in self._request_stream(
log, url, method, params, json, form, True
):
buffer += chunk
while delim in buffer:
line, buffer = buffer.split(delim, 1)
yield line
headers, body = await self._request_stream(
log, url, method, params, json, form, True, headers
)

async def gen() -> AsyncGenerator[bytes, None]:
buffer = b""
async for chunk in body():
buffer += chunk
while delim in buffer:
line, buffer = buffer.split(delim, 1)
yield line

if buffer:
yield buffer
if buffer:
yield buffer

return
return (headers, gen)

async def request_stream(
self,
Expand All @@ -115,13 +128,15 @@ async def request_stream(
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
form: dict[str, Any] | None = None,
) -> AsyncGenerator[bytes, None]:
headers: dict[str, Any] = {},
) -> tuple[Headers, BodyGeneratorFunction]:
"""Request a url and and return the raw response as a stream of bytes"""

return self._request_stream(log, url, method, params, json, form, True)
headers, body = await self._request_stream(log, url, method, params, json, form, True, headers)
return (headers, body)

@abc.abstractmethod
def _request_stream(
async def _request_stream(
self,
log: Logger,
url: str,
Expand All @@ -131,7 +146,7 @@ def _request_stream(
form: dict[str, Any] | None,
_with_token: bool,
headers: dict[str, Any] = {},
) -> AsyncGenerator[bytes, None]: ...
) -> HeadersAndBodyGenerator: ...

# TODO(johnny): This is an unstable API.
# It may need to accept request headers, or surface response headers,
Expand Down Expand Up @@ -315,7 +330,7 @@ async def _request_stream(
form: dict[str, Any] | None,
_with_token: bool,
headers: dict[str, Any] = {},
) -> AsyncGenerator[bytes, None]:
) -> HeadersAndBodyGenerator:
while True:
cur_delay = self.rate_limiter.delay
await asyncio.sleep(cur_delay)
Expand All @@ -330,14 +345,17 @@ async def _request_stream(
)
headers[self.token_source.authorization_header] = header_value

async with self.inner.request(
resp = await self.inner.request(
headers=headers,
json=json,
data=form,
method=method,
params=params,
url=url,
) as resp:
)

should_release_response = True
try:
self.rate_limiter.update(cur_delay, resp.status == 429)

if resp.status == 429:
Expand Down Expand Up @@ -366,7 +384,17 @@ async def _request_stream(
else:
resp.raise_for_status()

async for chunk in resp.content.iter_any():
yield chunk
async def body_generator() -> AsyncGenerator[bytes, None]:
try:
async for chunk in resp.content.iter_any():
yield chunk
finally:
await resp.release()

headers = Headers({k: v for k, v in resp.headers.items()})
should_release_response = False
return (headers, body_generator)

return
finally:
if should_release_response:
await resp.release()
3 changes: 2 additions & 1 deletion source-gladly/source_gladly/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ async def fetch_events(

last_ts = log_cursor
doc_count = 0
async for line in http.request_lines(log, url, params=params):
_, lines = await http.request_lines(log, url, params=params)
async for line in lines():
event = Event.model_validate_json(line)
if event.timestamp < last_ts:
# Events must be in ascending order with respect to time, so this is an application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ async def _paginate_through_report_results(
while True:
body = _build_report_body(date, report, offset)

_, response_body = await http.request_stream(log, url, method="POST", json=body)

processor = IncrementalJsonProcessor(
await http.request_stream(log, url, method="POST", json=body),
response_body(),
f"rows.item",
Row,
RunReportResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ async def fetch_rows(

params["fields"] = "sheets.data(rowData.values(effectiveFormat(numberFormat(type)),effectiveValue))"

_, body = await http.request_stream(log, url, params=params)

async for row in IncrementalJsonProcessor(
await http.request_stream(log, url, params=params),
body(),
"sheets.item.data.item.rowData.item",
RowData,
):
Expand Down
4 changes: 2 additions & 2 deletions source-shopify-native/source_shopify_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ async def fetch_products(

last_seen_dt = log_cursor

lines = http.request_lines(log, url)
async for record in products.process_result(log, lines):
_, lines = await http.request_lines(log, url)
async for record in products.process_result(log, lines()):
product = TimestampedResource.model_validate(record)

if product.updatedAt > last_seen_dt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,9 @@ async def _fetch_incremental_time_export_resources(

while True:
async with incremental_time_export_api_lock:
_, body = await http.request_stream(log, url, params=params)
processor = IncrementalJsonProcessor(
await http.request_stream(log, url, params=params),
body(),
f"{name}.item",
TimestampedResource,
response_model,
Expand Down Expand Up @@ -568,8 +569,9 @@ async def _fetch_incremental_cursor_export_resources(
params["cursor"] = _base64_encode(cursor)

while True:
_, body = await http.request_stream(log, url, params=params)
processor = IncrementalJsonProcessor(
await http.request_stream(log, url, params=params),
body(),
f"{name}.item",
TimestampedResource,
response_model,
Expand Down
Loading