Skip to content

Commit d7b19ee

Browse files
committed
RQ FS fetch performance for is_empty
1 parent 6bbd977 commit d7b19ee

File tree

1 file changed

+57
-21
lines changed

1 file changed

+57
-21
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,12 @@ def __init__(
118118
self._request_cache = deque[Request]()
119119
"""Cache for requests: forefront requests at the beginning, regular requests at the end."""
120120

121-
self._cache_needs_refresh = True
121+
self._request_cache_needs_refresh = True
122122
"""Flag indicating whether the cache needs to be refreshed from filesystem."""
123123

124+
self._is_empty_cache: bool | None = None
125+
"""Cache for is_empty result: None means unknown, True/False is cached state."""
126+
124127
@property
125128
@override
126129
def metadata(self) -> RequestQueueMetadata:
@@ -203,10 +206,23 @@ async def open(
203206

204207
# If the RQ directory exists, reconstruct the client from the metadata file.
205208
if rq_path.exists():
206-
# If metadata file is missing, raise an error.
207209
if not metadata_path.exists():
208-
raise ValueError(f'Metadata file not found for request queue "{name}"')
209-
210+
now = datetime.now(timezone.utc)
211+
metadata = RequestQueueMetadata(
212+
id=crypto_random_object_id(),
213+
name=name,
214+
created_at=now,
215+
accessed_at=now,
216+
modified_at=now,
217+
had_multiple_clients=False,
218+
handled_request_count=0,
219+
pending_request_count=0,
220+
stats={},
221+
total_request_count=0,
222+
)
223+
await asyncio.to_thread(rq_path.mkdir, parents=True, exist_ok=True)
224+
data = await json_dumps(metadata.model_dump())
225+
await atomic_write(metadata_path, data)
210226
file = await asyncio.to_thread(open, metadata_path)
211227
try:
212228
file_content = json.load(file)
@@ -260,7 +276,10 @@ async def drop(self) -> None:
260276

261277
self._in_progress.clear()
262278
self._request_cache.clear()
263-
self._cache_needs_refresh = True
279+
self._request_cache_needs_refresh = True
280+
281+
# Invalidate is_empty cache.
282+
self._is_empty_cache = None
264283

265284
@override
266285
async def purge(self) -> None:
@@ -272,15 +291,17 @@ async def purge(self) -> None:
272291

273292
self._in_progress.clear()
274293
self._request_cache.clear()
275-
self._cache_needs_refresh = True
294+
self._request_cache_needs_refresh = True
276295

277-
# Update metadata counts
278296
await self._update_metadata(
279297
update_modified_at=True,
280298
update_accessed_at=True,
281299
new_pending_request_count=0,
282300
)
283301

302+
# Invalidate is_empty cache.
303+
self._is_empty_cache = None
304+
284305
@override
285306
async def add_batch_of_requests(
286307
self,
@@ -298,6 +319,7 @@ async def add_batch_of_requests(
298319
Response containing information about the added requests.
299320
"""
300321
async with self._lock:
322+
self._is_empty_cache = None
301323
new_total_request_count = self._metadata.total_request_count
302324
new_pending_request_count = self._metadata.pending_request_count
303325
processed_requests = list[ProcessedRequest]()
@@ -409,7 +431,10 @@ async def add_batch_of_requests(
409431

410432
# Invalidate the cache if we added forefront requests.
411433
if forefront:
412-
self._cache_needs_refresh = True
434+
self._request_cache_needs_refresh = True
435+
436+
# Invalidate is_empty cache.
437+
self._is_empty_cache = None
413438

414439
return AddRequestsResponse(
415440
processed_requests=processed_requests,
@@ -450,7 +475,7 @@ async def fetch_next_request(self) -> Request | None:
450475
"""
451476
async with self._lock:
452477
# Refresh cache if needed or if it's empty.
453-
if self._cache_needs_refresh or not self._request_cache:
478+
if self._request_cache_needs_refresh or not self._request_cache:
454479
await self._refresh_cache()
455480

456481
next_request: Request | None = None
@@ -481,6 +506,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
481506
Information about the queue operation. `None` if the given request was not in progress.
482507
"""
483508
async with self._lock:
509+
self._is_empty_cache = None
510+
484511
# Check if the request is in progress.
485512
if request.id not in self._in_progress:
486513
logger.warning(f'Marking request {request.id} as handled that is not in progress.')
@@ -537,6 +564,8 @@ async def reclaim_request(
537564
Information about the queue operation. `None` if the given request was not in progress.
538565
"""
539566
async with self._lock:
567+
self._is_empty_cache = None
568+
540569
# Check if the request is in progress.
541570
if request.id not in self._in_progress:
542571
logger.info(f'Reclaiming request {request.id} that is not in progress.')
@@ -587,28 +616,35 @@ async def reclaim_request(
587616

588617
@override
589618
async def is_empty(self) -> bool:
590-
"""Check if the queue is empty.
591-
592-
Returns:
593-
True if the queue is empty, False otherwise.
594-
"""
619+
"""Check if the queue is empty, using a cached value if available and valid."""
595620
async with self._lock:
621+
# If we have a cached value, return it immediately.
622+
if self._is_empty_cache is not None:
623+
return self._is_empty_cache
624+
625+
# If we have a cached requests, check them first (fast path).
626+
if self._request_cache:
627+
for req in self._request_cache:
628+
if req.handled_at is None:
629+
self._is_empty_cache = False
630+
return False
631+
self._is_empty_cache = True
632+
return True
633+
634+
# Fallback: check files on disk (slow path).
596635
await self._update_metadata(update_accessed_at=True)
597636
request_files = await self._get_request_files(self.path_to_rq)
598637

599-
# Check each file to see if there are any unhandled requests.
600638
for request_file in request_files:
601639
request = await self._parse_request_file(request_file)
602-
603640
if request is None:
604641
continue
605-
606-
# If any request is not handled, the queue is not empty.
607642
if request.handled_at is None:
643+
self._is_empty_cache = False
608644
return False
609645

610-
# If we got here, all requests are handled or there are no requests.
611-
return True
646+
self._is_empty_cache = True
647+
return True
612648

613649
def _get_request_path(self, request_id: str) -> Path:
614650
"""Get the path to a specific request file.
@@ -732,7 +768,7 @@ async def _refresh_cache(self) -> None:
732768
break
733769
self._request_cache.append(request)
734770

735-
self._cache_needs_refresh = False
771+
self._request_cache_needs_refresh = False
736772

737773
@classmethod
738774
async def _get_request_files(cls, path_to_rq: Path) -> list[Path]:

0 commit comments

Comments
 (0)