@@ -313,20 +313,21 @@ async def backfill_satisfaction_ratings(
313
313
yield end
314
314
315
315
316
- async def _make_incremental_cursor_export_request (
316
+ async def _fetch_top_level_fields (
317
317
http : HTTPSession ,
318
318
url : str ,
319
319
params : dict [str , str | int ],
320
320
response_model : type [IncrementalCursorExportResponse ],
321
321
log : Logger ,
322
- ) -> IncrementalCursorExportResponse :
322
+ ) -> tuple [ str | None , bool ] :
323
323
# Instead of using Pydantic's model_validate_json that uses json.loads internally,
324
324
# use json.JSONDecoder().raw_decode to reduce memory overhead when processing the response.
325
325
raw_response_bytes = await http .request (log , url , params = params )
326
326
obj , _ = json .JSONDecoder ().raw_decode (raw_response_bytes .decode ('utf-8' ))
327
327
# model_construct is used to avoid validating & transforming all resources within large response bodies at once
328
328
# to reduce memory overhead. Instead, resources are validated & transformed one-by-one as they are yielded.
329
- return response_model .model_construct (** obj )
329
+ response = response_model .model_construct (** obj )
330
+ return (response .after_cursor , response .end_of_stream )
330
331
331
332
332
333
async def _fetch_incremental_cursor_export_resources (
@@ -360,21 +361,13 @@ async def _fetch_incremental_cursor_export_resources(
360
361
params ["cursor" ] = _base64_encode (cursor )
361
362
362
363
while True :
363
- response = await _make_incremental_cursor_export_request (http , url , params , response_model , log )
364
-
365
- next_page_cursor = response .after_cursor
366
- end_of_stream = response .end_of_stream
367
-
368
- next_page_cursor = response .after_cursor
369
- end_of_stream = response .end_of_stream
364
+ next_page_cursor , end_of_stream = await _fetch_top_level_fields (http , url , params , response_model , log )
370
365
371
366
if next_page_cursor is None :
372
367
return
373
368
374
- for resource in response .resources :
375
- # Since _make_incremental_cursor_export_request does not validate & transform all resources at once,
376
- # we validate them one-by-one as they're yielded.
377
- yield TimestampedResource .model_validate (resource )
369
+ async for resource in http .request_object_stream (log , TimestampedResource , f"{ name } .item" , url , params = params ):
370
+ yield resource
378
371
379
372
yield _base64_decode (next_page_cursor )
380
373
0 commit comments