Skip to content

Commit b165143

Browse files
authored
Fixes to RequestQueue (#82)
This fixes three bugs in `RequestQueue`: - when loading a queue from disk, `orderNo`s were not correctly converted back to `Decimal`s - when adding requests to the queue, even handled requests were added to the queue head - when opening an already populated queue, if you first added another request to it before you called `fetch_next_request()`, the queue head would not contain the requests that were in the queue before you opened it It also adds an E2E test which catches all three of these things.
1 parent 55a57fe commit b165143

File tree

4 files changed

+88
-4
lines changed

4 files changed

+88
-4
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ Changelog
88

99
- started triggering base Docker image builds when releasing a new version
1010

11+
### Fixed
12+
13+
- fixed `RequestQueue` not loading requests from an existing queue properly
14+
1115
[0.2.0](../../releases/tag/v0.2.0) - 2023-03-06
1216
-----------------------------------------------
1317

src/apify/_memory_storage/resource_clients/request_queue.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,8 @@ def _create_from_directory(
437437

438438
with open(os.path.join(storage_directory, entry.name)) as f:
439439
request = json.load(f)
440+
if request.get('orderNo'):
441+
request['orderNo'] = Decimal(request.get('orderNo'))
440442
entries.append(request)
441443

442444
new_client = cls(

src/apify/storages/request_queue.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
166166
self._cache_request(cache_key, queue_operation_info)
167167

168168
request_id, was_already_present = queue_operation_info['requestId'], queue_operation_info['wasAlreadyPresent']
169-
if not was_already_present and request_id not in self._in_progress and self._recently_handled.get(request_id) is None:
169+
is_handled = request.get('handledAt') is not None
170+
if not is_handled and not was_already_present and request_id not in self._in_progress and self._recently_handled.get(request_id) is None:
170171
self._assumed_total_count += 1
171172

172173
self._maybe_add_request_to_queue_head(request_id, forefront)
@@ -520,4 +521,6 @@ async def open(
520521
Returns:
521522
RequestQueue: An instance of the `RequestQueue` class for the given ID or name.
522523
"""
523-
return await super().open(id=id, name=name, force_cloud=force_cloud, config=config)
524+
queue = await super().open(id=id, name=name, force_cloud=force_cloud, config=config)
525+
await queue._ensure_head_is_non_empty()
526+
return queue

tests/unit/actor/test_actor_memory_storage_e2e.py

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timezone
12
from typing import Callable
23

34
import pytest
@@ -8,7 +9,7 @@
89

910

1011
@pytest.mark.parametrize('purge_on_start', [True, False])
11-
async def test_actor_memory_storage_client_e2e(
12+
async def test_actor_memory_storage_client_key_value_store_e2e(
1213
monkeypatch: pytest.MonkeyPatch,
1314
purge_on_start: bool,
1415
reset_default_instances: Callable[[], None],
@@ -17,7 +18,7 @@ async def test_actor_memory_storage_client_e2e(
1718
The second run attempts to access data created by the first one.
1819
We run 2 configurations with different `purge_on_start`."""
1920
# Configure purging env var
20-
monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, 'true' if purge_on_start else 'false')
21+
monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, f'{int(purge_on_start)}')
2122
# Store old storage client so we have the object reference for comparison
2223
old_client = StorageClientManager.get_storage_client()
2324
async with Actor:
@@ -45,3 +46,77 @@ async def test_actor_memory_storage_client_e2e(
4546
else:
4647
assert default_value == 'default value'
4748
assert non_default_value == 'non-default value'
49+
50+
51+
@pytest.mark.parametrize('purge_on_start', [True, False])
52+
async def test_actor_memory_storage_client_request_queue_e2e(
53+
monkeypatch: pytest.MonkeyPatch,
54+
purge_on_start: bool,
55+
reset_default_instances: Callable[[], None],
56+
) -> None:
57+
"""This test simulates two clean runs using memory storage.
58+
The second run attempts to access data created by the first one.
59+
We run 2 configurations with different `purge_on_start`."""
60+
# Configure purging env var
61+
monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, f'{int(purge_on_start)}')
62+
async with Actor:
63+
# Add some requests to the default queue
64+
default_queue = await Actor.open_request_queue()
65+
for i in range(6):
66+
request_url = f'http://example.com/{i}'
67+
forefront = i % 3 == 1
68+
was_handled = i % 3 == 2
69+
await default_queue.add_request({
70+
'uniqueKey': str(i),
71+
'url': request_url,
72+
'handledAt': datetime.now(timezone.utc) if was_handled else None,
73+
}, forefront=forefront)
74+
75+
# We simulate another clean run, we expect the memory storage to read from the local data directory
76+
# Default storages are purged based on purge_on_start parameter.
77+
reset_default_instances()
78+
79+
async with Actor:
80+
# Add some more requests to the default queue
81+
default_queue = await Actor.open_request_queue()
82+
for i in range(6, 12):
83+
request_url = f'http://example.com/{i}'
84+
forefront = i % 3 == 1
85+
was_handled = i % 3 == 2
86+
await default_queue.add_request({
87+
'uniqueKey': str(i),
88+
'url': request_url,
89+
'handledAt': datetime.now(timezone.utc) if was_handled else None,
90+
}, forefront=forefront)
91+
92+
queue_info = await default_queue.get_info()
93+
assert queue_info is not None
94+
95+
# If the queue was purged between the runs, only the requests from the second run should be present, in the right order
96+
if purge_on_start:
97+
assert queue_info.get('totalRequestCount') == 6
98+
assert queue_info.get('handledRequestCount') == 2
99+
100+
expected_pending_request_order = [10, 7, 6, 9]
101+
for request_number in expected_pending_request_order:
102+
next_request = await default_queue.fetch_next_request()
103+
assert next_request is not None
104+
assert next_request.get('uniqueKey') == f'{request_number}'
105+
assert next_request.get('url') == f'http://example.com/{request_number}'
106+
107+
next_request = await default_queue.fetch_next_request()
108+
assert next_request is None
109+
# If the queue was NOT purged between the runs, all the requests should be in the queue in the right order
110+
else:
111+
assert queue_info.get('totalRequestCount') == 12
112+
assert queue_info.get('handledRequestCount') == 4
113+
114+
expected_pending_request_order = [10, 7, 4, 1, 0, 3, 6, 9]
115+
for request_number in expected_pending_request_order:
116+
next_request = await default_queue.fetch_next_request()
117+
assert next_request is not None
118+
assert next_request.get('uniqueKey') == f'{request_number}'
119+
assert next_request.get('url') == f'http://example.com/{request_number}'
120+
121+
next_request = await default_queue.fetch_next_request()
122+
assert next_request is None

0 commit comments

Comments
 (0)