Skip to content

Commit 682521a

Browse files
committed
forefront update
1 parent a1da132 commit 682521a

File tree

2 files changed

+48
-23
lines changed

2 files changed

+48
-23
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -429,17 +429,54 @@ async def fetch_next_request(self) -> Request | None:
429429
# Create the requests directory if it doesn't exist
430430
await asyncio.to_thread(self.path_to_rq.mkdir, parents=True, exist_ok=True)
431431

432-
# List all request files
433-
request_files = await asyncio.to_thread(list, self.path_to_rq.glob('*.json'))
432+
# First check forefront requests in the exact order they were added
433+
for request_id in list(self._forefront_requests):
434+
# Skip if already in progress
435+
if request_id in self._in_progress:
436+
continue
437+
438+
request_path = self.path_to_rq / f'{request_id}.json'
439+
440+
# Skip if file doesn't exist
441+
if not await asyncio.to_thread(request_path.exists):
442+
self._forefront_requests.remove(request_id)
443+
continue
444+
445+
file = await asyncio.to_thread(open, request_path)
446+
try:
447+
file_content = json.load(file)
448+
# Skip if already handled
449+
if file_content.get('handled_at') is not None:
450+
self._forefront_requests.remove(request_id)
451+
continue
452+
453+
# Create request object
454+
request = Request(**file_content)
455+
456+
# Mark as in-progress in memory
457+
self._in_progress.add(request.id)
434458

435-
# First check for forefront requests
436-
forefront_requests = []
459+
# Remove from forefront list
460+
self._forefront_requests.remove(request.id)
461+
462+
# Update accessed timestamp
463+
await self._update_metadata(update_accessed_at=True)
464+
except (json.JSONDecodeError, ValidationError) as exc:
465+
logger.warning(f'Failed to parse request file {request_path}: {exc!s}')
466+
self._forefront_requests.remove(request_id)
467+
else:
468+
return request
469+
finally:
470+
await asyncio.to_thread(file.close)
471+
472+
# List all request files for regular (non-forefront) requests
473+
request_files = await asyncio.to_thread(list, self.path_to_rq.glob('*.json'))
437474
regular_requests = []
438475

439476
# Get file creation times for sorting regular requests in FIFO order
440477
request_file_times = {}
441478

442-
# Separate requests into forefront and regular
479+
# Filter out metadata files and in-progress requests
443480
for request_file in request_files:
444481
# Skip metadata file
445482
if request_file.name == METADATA_FILENAME:
@@ -448,8 +485,8 @@ async def fetch_next_request(self) -> Request | None:
448485
# Extract request ID from filename
449486
request_id = request_file.stem
450487

451-
# Skip if already in progress
452-
if request_id in self._in_progress:
488+
# Skip if already in progress or in forefront
489+
if request_id in self._in_progress or request_id in self._forefront_requests:
453490
continue
454491

455492
# Get file creation/modification time for FIFO ordering
@@ -460,21 +497,13 @@ async def fetch_next_request(self) -> Request | None:
460497
# If we can't get the time, use 0 (oldest)
461498
request_file_times[request_file] = 0
462499

463-
if request_id in self._forefront_requests:
464-
forefront_requests.append(request_file)
465-
else:
466-
regular_requests.append(request_file)
500+
regular_requests.append(request_file)
467501

468502
# Sort regular requests by creation time (FIFO order)
469503
regular_requests.sort(key=lambda f: request_file_times[f])
470504

471-
# Prioritize forefront requests
472-
prioritized_files = forefront_requests + regular_requests
473-
474-
# Process files in prioritized order
475-
for request_file in prioritized_files:
476-
request_id = request_file.stem
477-
505+
# Process regular requests in FIFO order
506+
for request_file in regular_requests:
478507
file = await asyncio.to_thread(open, request_file)
479508
try:
480509
file_content = json.load(file)
@@ -488,13 +517,8 @@ async def fetch_next_request(self) -> Request | None:
488517
# Mark as in-progress in memory
489518
self._in_progress.add(request.id)
490519

491-
# Remove from forefront list if it was there
492-
if request.id in self._forefront_requests:
493-
self._forefront_requests.remove(request.id)
494-
495520
# Update accessed timestamp
496521
await self._update_metadata(update_accessed_at=True)
497-
498522
except (json.JSONDecodeError, ValidationError) as exc:
499523
logger.warning(f'Failed to parse request file {request_file}: {exc!s}')
500524
else:

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ async def test_crawler_get_storages() -> None:
543543
assert isinstance(kvs, KeyValueStore)
544544

545545

546+
# THIS
546547
async def test_crawler_run_requests() -> None:
547548
crawler = BasicCrawler()
548549
seen_urls = list[str]()

0 commit comments

Comments
 (0)