Skip to content

Commit 9b0d10f

Browse files
committed
Revert "estuary-cdk: return a tuple of headers & a function to get the response body"
This reverts commit 33e17bb.
1 parent 318c23f commit 9b0d10f

File tree

6 files changed

+32
-67
lines changed
  • estuary-cdk/estuary_cdk
  • source-gladly/source_gladly
  • source-google-analytics-data-api-native/source_google_analytics_data_api_native
  • source-google-sheets-native/source_google_sheets_native
  • source-shopify-native/source_shopify_native
  • source-zendesk-support-native/source_zendesk_support_native

6 files changed

+32
-67
lines changed

estuary-cdk/estuary_cdk/http.py

+25-53
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from logging import Logger
33
from estuary_cdk.incremental_json_processor import Remainder
44
from pydantic import BaseModel
5-
from typing import AsyncGenerator, Any, TypeVar, Union, Callable
5+
from typing import AsyncGenerator, Any, TypeVar
66
import abc
77
import aiohttp
88
import asyncio
@@ -25,13 +25,6 @@
2525

2626
StreamedObject = TypeVar("StreamedObject", bound=BaseModel)
2727

28-
class Headers(dict[str, Any]):
29-
pass
30-
31-
32-
BodyGeneratorFunction = Callable[[], AsyncGenerator[bytes, None]]
33-
HeadersAndBodyGenerator = tuple[Headers, BodyGeneratorFunction]
34-
3528

3629
class HTTPError(RuntimeError):
3730
"""
@@ -76,11 +69,9 @@ async def request(
7669
"""Request a url and return its body as bytes"""
7770

7871
chunks: list[bytes] = []
79-
_, body_generator = await self._request_stream(
72+
async for chunk in self._request_stream(
8073
log, url, method, params, json, form, _with_token, headers
81-
)
82-
83-
async for chunk in body_generator():
74+
):
8475
chunks.append(chunk)
8576

8677
if len(chunks) == 0:
@@ -99,26 +90,22 @@ async def request_lines(
9990
json: dict[str, Any] | None = None,
10091
form: dict[str, Any] | None = None,
10192
delim: bytes = b"\n",
102-
headers: dict[str, Any] = {}
103-
) -> tuple[Headers, BodyGeneratorFunction]:
93+
) -> AsyncGenerator[bytes, None]:
10494
"""Request a url and return its response as streaming lines, as they arrive"""
10595

106-
headers, body = await self._request_stream(
107-
log, url, method, params, json, form, True, headers
108-
)
109-
110-
async def gen() -> AsyncGenerator[bytes, None]:
111-
buffer = b""
112-
async for chunk in body():
113-
buffer += chunk
114-
while delim in buffer:
115-
line, buffer = buffer.split(delim, 1)
116-
yield line
96+
buffer = b""
97+
async for chunk in self._request_stream(
98+
log, url, method, params, json, form, True
99+
):
100+
buffer += chunk
101+
while delim in buffer:
102+
line, buffer = buffer.split(delim, 1)
103+
yield line
117104

118-
if buffer:
119-
yield buffer
105+
if buffer:
106+
yield buffer
120107

121-
return (headers, gen)
108+
return
122109

123110
async def request_stream(
124111
self,
@@ -128,15 +115,13 @@ async def request_stream(
128115
params: dict[str, Any] | None = None,
129116
json: dict[str, Any] | None = None,
130117
form: dict[str, Any] | None = None,
131-
headers: dict[str, Any] = {},
132-
) -> tuple[Headers, BodyGeneratorFunction]:
118+
) -> AsyncGenerator[bytes, None]:
133119
"""Request a url and and return the raw response as a stream of bytes"""
134120

135-
headers, body = await self._request_stream(log, url, method, params, json, form, True, headers)
136-
return (headers, body)
121+
return self._request_stream(log, url, method, params, json, form, True)
137122

138123
@abc.abstractmethod
139-
async def _request_stream(
124+
def _request_stream(
140125
self,
141126
log: Logger,
142127
url: str,
@@ -146,7 +131,7 @@ async def _request_stream(
146131
form: dict[str, Any] | None,
147132
_with_token: bool,
148133
headers: dict[str, Any] = {},
149-
) -> HeadersAndBodyGenerator: ...
134+
) -> AsyncGenerator[bytes, None]: ...
150135

151136
# TODO(johnny): This is an unstable API.
152137
# It may need to accept request headers, or surface response headers,
@@ -330,7 +315,7 @@ async def _request_stream(
330315
form: dict[str, Any] | None,
331316
_with_token: bool,
332317
headers: dict[str, Any] = {},
333-
) -> HeadersAndBodyGenerator:
318+
) -> AsyncGenerator[bytes, None]:
334319
while True:
335320
cur_delay = self.rate_limiter.delay
336321
await asyncio.sleep(cur_delay)
@@ -345,17 +330,14 @@ async def _request_stream(
345330
)
346331
headers[self.token_source.authorization_header] = header_value
347332

348-
resp = await self.inner.request(
333+
async with self.inner.request(
349334
headers=headers,
350335
json=json,
351336
data=form,
352337
method=method,
353338
params=params,
354339
url=url,
355-
)
356-
357-
should_release_response = True
358-
try:
340+
) as resp:
359341
self.rate_limiter.update(cur_delay, resp.status == 429)
360342

361343
if resp.status == 429:
@@ -384,17 +366,7 @@ async def _request_stream(
384366
else:
385367
resp.raise_for_status()
386368

387-
async def body_generator() -> AsyncGenerator[bytes, None]:
388-
try:
389-
async for chunk in resp.content.iter_any():
390-
yield chunk
391-
finally:
392-
await resp.release()
393-
394-
headers = Headers({k: v for k, v in resp.headers.items()})
395-
should_release_response = False
396-
return (headers, body_generator)
369+
async for chunk in resp.content.iter_any():
370+
yield chunk
397371

398-
finally:
399-
if should_release_response:
400-
await resp.release()
372+
return

source-gladly/source_gladly/api.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ async def fetch_events(
3333

3434
last_ts = log_cursor
3535
doc_count = 0
36-
_, lines = await http.request_lines(log, url, params=params)
37-
async for line in lines():
36+
async for line in http.request_lines(log, url, params=params):
3837
event = Event.model_validate_json(line)
3938
if event.timestamp < last_ts:
4039
# Events must be in ascending order with respect to time, so this is an application

source-google-analytics-data-api-native/source_google_analytics_data_api_native/api.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,8 @@ async def _paginate_through_report_results(
128128
while True:
129129
body = _build_report_body(date, report, offset)
130130

131-
_, response_body = await http.request_stream(log, url, method="POST", json=body)
132-
133131
processor = IncrementalJsonProcessor(
134-
response_body(),
132+
await http.request_stream(log, url, method="POST", json=body),
135133
f"rows.item",
136134
Row,
137135
RunReportResponse,

source-google-sheets-native/source_google_sheets_native/api.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,8 @@ async def fetch_rows(
5858

5959
params["fields"] = "sheets.data(rowData.values(effectiveFormat(numberFormat(type)),effectiveValue))"
6060

61-
_, body = await http.request_stream(log, url, params=params)
62-
6361
async for row in IncrementalJsonProcessor(
64-
body(),
62+
await http.request_stream(log, url, params=params),
6563
"sheets.item.data.item.rowData.item",
6664
RowData,
6765
):

source-shopify-native/source_shopify_native/api.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ async def fetch_products(
3434

3535
last_seen_dt = log_cursor
3636

37-
_, lines = await http.request_lines(log, url)
38-
async for record in products.process_result(log, lines()):
37+
lines = http.request_lines(log, url)
38+
async for record in products.process_result(log, lines):
3939
product = TimestampedResource.model_validate(record)
4040

4141
if product.updatedAt > last_seen_dt:

source-zendesk-support-native/source_zendesk_support_native/api.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,8 @@ async def _fetch_incremental_time_export_resources(
440440

441441
while True:
442442
async with incremental_time_export_api_lock:
443-
_, body = await http.request_stream(log, url, params=params)
444443
processor = IncrementalJsonProcessor(
445-
body(),
444+
await http.request_stream(log, url, params=params),
446445
f"{name}.item",
447446
TimestampedResource,
448447
response_model,
@@ -569,9 +568,8 @@ async def _fetch_incremental_cursor_export_resources(
569568
params["cursor"] = _base64_encode(cursor)
570569

571570
while True:
572-
_, body = await http.request_stream(log, url, params=params)
573571
processor = IncrementalJsonProcessor(
574-
body(),
572+
await http.request_stream(log, url, params=params),
575573
f"{name}.item",
576574
TimestampedResource,
577575
response_model,

0 commit comments

Comments
 (0)