Skip to content

Commit 52a5a79

Browse files
committed
RQ FS performance optimization in add_requests
1 parent 224847c commit 52a5a79

File tree

1 file changed

+19
-15
lines changed

1 file changed

+19
-15
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -303,23 +303,24 @@ async def add_batch_of_requests(
303303
processed_requests = list[ProcessedRequest]()
304304
unprocessed_requests = list[UnprocessedRequest]()
305305

306+
# Prepare a dictionary to track existing requests by their unique keys.
307+
existing_unique_keys: dict[str, Path] = {}
308+
existing_request_files = await self._get_request_files(self.path_to_rq)
309+
310+
for request_file in existing_request_files:
311+
existing_request = await self._parse_request_file(request_file)
312+
if existing_request is not None:
313+
existing_unique_keys[existing_request.unique_key] = request_file
314+
315+
# Process each request in the batch.
306316
for request in requests:
307-
existing_request_files = await self._get_request_files(self.path_to_rq)
317+
existing_request_file = existing_unique_keys.get(request.unique_key)
308318
existing_request = None
309319

310-
# Go through existing requests to find if the request already exists in the queue.
311-
for existing_request_file in existing_request_files:
320+
# Only load the full request from disk if we found a duplicate
321+
if existing_request_file is not None:
312322
existing_request = await self._parse_request_file(existing_request_file)
313323

314-
if existing_request is None:
315-
continue
316-
317-
# If the unique key matches, we found an existing request
318-
if existing_request.unique_key == request.unique_key:
319-
break
320-
321-
existing_request = None
322-
323324
# If there is no existing request with the same unique key, add the new request.
324325
if existing_request is None:
325326
request_path = self._get_request_path(request.id)
@@ -343,6 +344,9 @@ async def add_batch_of_requests(
343344
new_total_request_count += 1
344345
new_pending_request_count += 1
345346

347+
# Add to our index for subsequent requests in this batch
348+
existing_unique_keys[request.unique_key] = self._get_request_path(request.id)
349+
346350
processed_requests.append(
347351
ProcessedRequest(
348352
id=request.id,
@@ -352,7 +356,7 @@ async def add_batch_of_requests(
352356
)
353357
)
354358

355-
# If the request already exists, we need to update it.
359+
# If the request already exists in the RQ, just update it if needed.
356360
else:
357361
# Set the processed request flags.
358362
was_already_present = existing_request is not None
@@ -371,10 +375,10 @@ async def add_batch_of_requests(
371375

372376
# If the request is already in the RQ but not handled yet, update it.
373377
elif was_already_present and not was_already_handled:
374-
request_path = self._get_request_path(request.id)
378+
request_path = self._get_request_path(existing_request.id)
375379
request_dict = existing_request.model_dump()
376380
request_dict['__forefront'] = forefront
377-
request_data = await json_dumps(existing_request.model_dump())
381+
request_data = await json_dumps(request_dict)
378382
await atomic_write(request_path, request_data)
379383

380384
processed_requests.append(

0 commit comments

Comments
 (0)