Skip to content

Commit 4d6d143

Browse files
committed
FS RQ client uses sequence to be able to guarantee order
1 parent 682521a commit 4d6d143

File tree

3 files changed

+75
-25
lines changed

3 files changed

+75
-25
lines changed

pyproject.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ ignore = [
143143
"ISC001", # This rule may cause conflicts when used with the formatter
144144
"FIX", # flake8-fixme
145145
"PLR0911", # Too many return statements
146+
"PLR0912", # Too many branches
146147
"PLR0913", # Too many arguments in function definition
147148
"PLR0915", # Too many statements
148149
"PYI034", # `__aenter__` methods in classes like `{name}` usually return `self` at runtime
@@ -204,9 +205,6 @@ builtins-ignorelist = ["id"]
204205
[tool.ruff.lint.isort]
205206
known-first-party = ["crawlee"]
206207

207-
[tool.ruff.lint.pylint]
208-
max-branches = 18
209-
210208
[tool.pytest.ini_options]
211209
addopts = "-ra"
212210
asyncio_default_fixture_loop_scope = "function"

src/crawlee/_utils/globs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def _translate(
7373
return rf'(?s:{res})\Z'
7474

7575

76-
def _fnmatch_translate(pat: str, star: str, question_mark: str) -> list[str]: # noqa: PLR0912
76+
def _fnmatch_translate(pat: str, star: str, question_mark: str) -> list[str]:
7777
"""Copy of fnmatch._translate from Python 3.13."""
7878
res = list[str]()
7979
add = res.append

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ def __init__(
9696
"""A list of request IDs that should be prioritized (added with forefront=True).
9797
Most recent forefront requests are added at the beginning of the list."""
9898

99+
self._sequence_counter = 0
100+
"""A counter to track the order of requests added to the queue."""
101+
99102
@override
100103
@property
101104
def metadata(self) -> RequestQueueMetadata:
@@ -277,10 +280,6 @@ async def add_batch_of_requests(
277280
await asyncio.to_thread(self.path_to_rq.mkdir, parents=True, exist_ok=True)
278281

279282
for request in requests:
280-
# Ensure the request has an ID
281-
if not request.id:
282-
request.id = crypto_random_object_id()
283-
284283
# Check if the request is already in the queue by unique_key
285284
existing_request = None
286285

@@ -357,6 +356,11 @@ async def add_batch_of_requests(
357356
if request_dict.get('handled_at') is None:
358357
request_dict.pop('handled_at', None)
359358

359+
# Add sequence number to ensure FIFO ordering
360+
sequence_number = self._sequence_counter
361+
self._sequence_counter += 1
362+
request_dict['_sequence'] = sequence_number
363+
360364
request_data = await json_dumps(request_dict)
361365
await asyncio.to_thread(request_path.write_text, request_data, encoding='utf-8')
362366

@@ -471,10 +475,10 @@ async def fetch_next_request(self) -> Request | None:
471475

472476
# List all request files for regular (non-forefront) requests
473477
request_files = await asyncio.to_thread(list, self.path_to_rq.glob('*.json'))
474-
regular_requests = []
475478

476-
# Get file creation times for sorting regular requests in FIFO order
477-
request_file_times = {}
479+
# Dictionary to store request files by their sequence number
480+
request_sequences = {}
481+
requests_without_sequence = []
478482

479483
# Filter out metadata files and in-progress requests
480484
for request_file in request_files:
@@ -489,25 +493,34 @@ async def fetch_next_request(self) -> Request | None:
489493
if request_id in self._in_progress or request_id in self._forefront_requests:
490494
continue
491495

492-
# Get file creation/modification time for FIFO ordering
496+
# Read the file to get the sequence number
493497
try:
494-
file_stat = await asyncio.to_thread(request_file.stat)
495-
request_file_times[request_file] = file_stat.st_mtime
496-
except Exception:
497-
# If we can't get the time, use 0 (oldest)
498-
request_file_times[request_file] = 0
499-
500-
regular_requests.append(request_file)
501-
502-
# Sort regular requests by creation time (FIFO order)
503-
regular_requests.sort(key=lambda f: request_file_times[f])
498+
file = await asyncio.to_thread(open, request_file)
499+
try:
500+
file_content = json.load(file)
501+
# Skip if already handled
502+
if file_content.get('handled_at') is not None:
503+
continue
504+
505+
# Use sequence number for ordering if available
506+
sequence_number = file_content.get('_sequence')
507+
if sequence_number is not None:
508+
request_sequences[sequence_number] = request_file
509+
else:
510+
# For backward compatibility with existing files
511+
requests_without_sequence.append(request_file)
512+
finally:
513+
await asyncio.to_thread(file.close)
514+
except (json.JSONDecodeError, ValidationError) as exc:
515+
logger.warning(f'Failed to parse request file {request_file}: {exc!s}')
504516

505-
# Process regular requests in FIFO order
506-
for request_file in regular_requests:
517+
# Process requests with sequence numbers first, in FIFO order
518+
for sequence in sorted(request_sequences.keys()):
519+
request_file = request_sequences[sequence]
507520
file = await asyncio.to_thread(open, request_file)
508521
try:
509522
file_content = json.load(file)
510-
# Skip if already handled
523+
# Skip if already handled (double-check)
511524
if file_content.get('handled_at') is not None:
512525
continue
513526

@@ -526,6 +539,45 @@ async def fetch_next_request(self) -> Request | None:
526539
finally:
527540
await asyncio.to_thread(file.close)
528541

542+
# Process requests without sequence numbers using file timestamps (backward compatibility)
543+
if requests_without_sequence:
544+
# Get file creation times for sorting
545+
request_file_times = {}
546+
for request_file in requests_without_sequence:
547+
try:
548+
file_stat = await asyncio.to_thread(request_file.stat)
549+
request_file_times[request_file] = file_stat.st_mtime
550+
except Exception: # noqa: PERF203
551+
# If we can't get the time, use 0 (oldest)
552+
request_file_times[request_file] = 0
553+
554+
# Sort by creation time
555+
requests_without_sequence.sort(key=lambda f: request_file_times[f])
556+
557+
# Process requests without sequence in file timestamp order
558+
for request_file in requests_without_sequence:
559+
file = await asyncio.to_thread(open, request_file)
560+
try:
561+
file_content = json.load(file)
562+
# Skip if already handled
563+
if file_content.get('handled_at') is not None:
564+
continue
565+
566+
# Create request object
567+
request = Request(**file_content)
568+
569+
# Mark as in-progress in memory
570+
self._in_progress.add(request.id)
571+
572+
# Update accessed timestamp
573+
await self._update_metadata(update_accessed_at=True)
574+
except (json.JSONDecodeError, ValidationError) as exc:
575+
logger.warning(f'Failed to parse request file {request_file}: {exc!s}')
576+
else:
577+
return request
578+
finally:
579+
await asyncio.to_thread(file.close)
580+
529581
return None
530582

531583
@override

0 commit comments

Comments
 (0)