Skip to content

Commit 8df5054

Browse files
committed
RQ and Apify clients (will be moved to SDK later)
1 parent 761ed16 commit 8df5054

19 files changed

+931
-95
lines changed

docs/guides/code_examples/storages/rq_basic_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async def main() -> None:
1212
await request_queue.add_request('https://apify.com/')
1313

1414
# Add multiple requests as a batch.
15-
await request_queue.add_requests_batched(
15+
await request_queue.add_requests(
1616
['https://crawlee.dev/', 'https://crawlee.dev/python/']
1717
)
1818

docs/guides/code_examples/storages/rq_with_crawler_explicit_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ async def main() -> None:
1010
request_queue = await RequestQueue.open(name='my-request-queue')
1111

1212
# Interact with the request queue directly, e.g. add a batch of requests.
13-
await request_queue.add_requests_batched(
13+
await request_queue.add_requests(
1414
['https://apify.com/', 'https://crawlee.dev/']
1515
)
1616

src/crawlee/_utils/file.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ async def export_json_to_stream(
134134
**kwargs: Unpack[ExportDataJsonKwargs],
135135
) -> None:
136136
items = [item async for item in iterator]
137-
138-
if items:
139-
json.dump(items, dst, **kwargs)
140-
else:
141-
logger.warning('Attempting to export an empty dataset - no file will be created')
137+
json.dump(items, dst, **kwargs)
142138

143139

144140
async def export_csv_to_stream(

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ async def add_requests(
648648
"""
649649
request_manager = await self.get_request_manager()
650650

651-
await request_manager.add_requests_batched(
651+
await request_manager.add_requests(
652652
requests=requests,
653653
batch_size=batch_size,
654654
wait_time_between_batches=wait_time_between_batches,
@@ -976,7 +976,7 @@ async def _commit_request_handler_result(self, context: BasicCrawlingContext) ->
976976
):
977977
requests.append(dst_request)
978978

979-
await request_manager.add_requests_batched(requests)
979+
await request_manager.add_requests(requests)
980980

981981
for push_data_call in result.push_data_calls:
982982
await self._push_data(**push_data_call)

src/crawlee/request_loaders/_request_loader.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55

66
from crawlee import Request
77
from crawlee._utils.docs import docs_group
8+
from crawlee.request_loaders import RequestManagerTandem
9+
from crawlee.storages import RequestQueue
810

911
if TYPE_CHECKING:
1012
from collections.abc import Sequence
1113

12-
from crawlee.request_loaders import RequestManager, RequestManagerTandem
14+
from crawlee.request_loaders import RequestManager
1315
from crawlee.storage_clients.models import ProcessedRequest
1416

1517

@@ -56,9 +58,6 @@ async def to_tandem(self, request_manager: RequestManager | None = None) -> Requ
5658
request_manager: Request manager to combine the loader with.
5759
If None is given, the default request queue is used.
5860
"""
59-
from crawlee.request_loaders import RequestManagerTandem
60-
from crawlee.storages import RequestQueue
61-
6261
if request_manager is None:
6362
request_manager = await RequestQueue.open()
6463

src/crawlee/request_loaders/_request_manager.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
from crawlee._utils.docs import docs_group
88
from crawlee.request_loaders._request_loader import RequestLoader
9+
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest
910

1011
if TYPE_CHECKING:
1112
from collections.abc import Sequence
1213

1314
from crawlee._request import Request
14-
from crawlee.storage_clients.models import ProcessedRequest
1515

1616

1717
@docs_group('Abstract classes')
@@ -40,27 +40,35 @@ async def add_request(
4040
Information about the request addition to the manager.
4141
"""
4242

43-
async def add_requests_batched(
43+
async def add_requests(
4444
self,
4545
requests: Sequence[str | Request],
4646
*,
47+
forefront: bool = False,
4748
batch_size: int = 1000, # noqa: ARG002
4849
wait_time_between_batches: timedelta = timedelta(seconds=1), # noqa: ARG002
4950
wait_for_all_requests_to_be_added: bool = False, # noqa: ARG002
5051
wait_for_all_requests_to_be_added_timeout: timedelta | None = None, # noqa: ARG002
51-
) -> None:
52+
) -> AddRequestsResponse:
5253
"""Add requests to the manager in batches.
5354
5455
Args:
5556
requests: Requests to enqueue.
57+
forefront: If True, add requests to the beginning of the queue.
5658
batch_size: The number of requests to add in one batch.
5759
wait_time_between_batches: Time to wait between adding batches.
5860
wait_for_all_requests_to_be_added: If True, wait for all requests to be added before returning.
5961
wait_for_all_requests_to_be_added_timeout: Timeout for waiting for all requests to be added.
6062
"""
6163
# Default and dumb implementation.
64+
processed_requests = list[ProcessedRequest]()
6265
for request in requests:
63-
await self.add_request(request)
66+
processed_request = await self.add_request(request, forefront=forefront)
67+
processed_requests.append(processed_request)
68+
return AddRequestsResponse(
69+
processed_requests=processed_requests,
70+
unprocessed_requests=[],
71+
)
6472

6573
@abstractmethod
6674
async def reclaim_request(self, request: Request, *, forefront: bool = False) -> ProcessedRequest | None:

src/crawlee/request_loaders/_request_manager_tandem.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def add_request(self, request: str | Request, *, forefront: bool = False)
4949
return await self._read_write_manager.add_request(request, forefront=forefront)
5050

5151
@override
52-
async def add_requests_batched(
52+
async def add_requests(
5353
self,
5454
requests: Sequence[str | Request],
5555
*,
@@ -58,7 +58,7 @@ async def add_requests_batched(
5858
wait_for_all_requests_to_be_added: bool = False,
5959
wait_for_all_requests_to_be_added_timeout: timedelta | None = None,
6060
) -> None:
61-
return await self._read_write_manager.add_requests_batched(
61+
return await self._read_write_manager.add_requests(
6262
requests,
6363
batch_size=batch_size,
6464
wait_time_between_batches=wait_time_between_batches,
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from ._dataset_client import ApifyDatasetClient
2+
from ._key_value_store_client import ApifyKeyValueStoreClient
3+
from ._request_queue_client import ApifyRequestQueueClient
4+
from ._storage_client import ApifyStorageClient
5+
6+
__all__ = [
7+
'ApifyDatasetClient',
8+
'ApifyKeyValueStoreClient',
9+
'ApifyRequestQueueClient',
10+
'ApifyStorageClient',
11+
]
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from logging import getLogger
5+
from typing import TYPE_CHECKING, Any, ClassVar
6+
7+
from apify_client import ApifyClientAsync
8+
from typing_extensions import override
9+
10+
from crawlee.storage_clients._base import DatasetClient
11+
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import AsyncIterator
15+
from datetime import datetime
16+
17+
from apify_client.clients import DatasetClientAsync
18+
19+
from crawlee.configuration import Configuration
20+
21+
logger = getLogger(__name__)
22+
23+
24+
class ApifyDatasetClient(DatasetClient):
25+
"""An Apify platform implementation of the dataset client."""
26+
27+
_cache_by_name: ClassVar[dict[str, ApifyDatasetClient]] = {}
28+
"""A dictionary to cache clients by their names."""
29+
30+
def __init__(
31+
self,
32+
*,
33+
id: str,
34+
name: str,
35+
created_at: datetime,
36+
accessed_at: datetime,
37+
modified_at: datetime,
38+
item_count: int,
39+
api_client: DatasetClientAsync,
40+
) -> None:
41+
"""Initialize a new instance.
42+
43+
Preferably use the `ApifyDatasetClient.open` class method to create a new instance.
44+
"""
45+
self._metadata = DatasetMetadata(
46+
id=id,
47+
name=name,
48+
created_at=created_at,
49+
accessed_at=accessed_at,
50+
modified_at=modified_at,
51+
item_count=item_count,
52+
)
53+
54+
self._api_client = api_client
55+
"""The Apify dataset client for API operations."""
56+
57+
self._lock = asyncio.Lock()
58+
"""A lock to ensure that only one operation is performed at a time."""
59+
60+
@override
61+
@property
62+
def metadata(self) -> DatasetMetadata:
63+
return self._metadata
64+
65+
@override
66+
@classmethod
67+
async def open(
68+
cls,
69+
*,
70+
id: str | None,
71+
name: str | None,
72+
configuration: Configuration,
73+
) -> ApifyDatasetClient:
74+
default_name = configuration.default_dataset_id
75+
token = 'configuration.apify_token' # TODO: use the real value
76+
api_url = 'configuration.apify_api_url' # TODO: use the real value
77+
78+
name = name or default_name
79+
80+
# Check if the client is already cached by name.
81+
if name in cls._cache_by_name:
82+
client = cls._cache_by_name[name]
83+
await client._update_metadata() # noqa: SLF001
84+
return client
85+
86+
# Otherwise, create a new one.
87+
apify_client_async = ApifyClientAsync(
88+
token=token,
89+
api_url=api_url,
90+
max_retries=8,
91+
min_delay_between_retries_millis=500,
92+
timeout_secs=360,
93+
)
94+
95+
apify_datasets_client = apify_client_async.datasets()
96+
97+
metadata = DatasetMetadata.model_validate(
98+
await apify_datasets_client.get_or_create(name=id if id is not None else name),
99+
)
100+
101+
apify_dataset_client = apify_client_async.dataset(dataset_id=metadata.id)
102+
103+
client = cls(
104+
id=metadata.id,
105+
name=metadata.name,
106+
created_at=metadata.created_at,
107+
accessed_at=metadata.accessed_at,
108+
modified_at=metadata.modified_at,
109+
item_count=metadata.item_count,
110+
api_client=apify_dataset_client,
111+
)
112+
113+
# Cache the client by name.
114+
cls._cache_by_name[name] = client
115+
116+
return client
117+
118+
@override
119+
async def drop(self) -> None:
120+
async with self._lock:
121+
await self._api_client.delete()
122+
123+
# Remove the client from the cache.
124+
if self.metadata.name in self.__class__._cache_by_name: # noqa: SLF001
125+
del self.__class__._cache_by_name[self.metadata.name] # noqa: SLF001
126+
127+
@override
128+
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
129+
async with self._lock:
130+
await self._api_client.push_items(items=data)
131+
await self._update_metadata()
132+
133+
@override
134+
async def get_data(
135+
self,
136+
*,
137+
offset: int = 0,
138+
limit: int | None = 999_999_999_999,
139+
clean: bool = False,
140+
desc: bool = False,
141+
fields: list[str] | None = None,
142+
omit: list[str] | None = None,
143+
unwind: str | None = None,
144+
skip_empty: bool = False,
145+
skip_hidden: bool = False,
146+
flatten: list[str] | None = None,
147+
view: str | None = None,
148+
) -> DatasetItemsListPage:
149+
response = await self._api_client.list_items(
150+
offset=offset,
151+
limit=limit,
152+
clean=clean,
153+
desc=desc,
154+
fields=fields,
155+
omit=omit,
156+
unwind=unwind,
157+
skip_empty=skip_empty,
158+
skip_hidden=skip_hidden,
159+
flatten=flatten,
160+
view=view,
161+
)
162+
result = DatasetItemsListPage.model_validate(vars(response))
163+
await self._update_metadata()
164+
return result
165+
166+
@override
167+
async def iterate_items(
168+
self,
169+
*,
170+
offset: int = 0,
171+
limit: int | None = None,
172+
clean: bool = False,
173+
desc: bool = False,
174+
fields: list[str] | None = None,
175+
omit: list[str] | None = None,
176+
unwind: str | None = None,
177+
skip_empty: bool = False,
178+
skip_hidden: bool = False,
179+
) -> AsyncIterator[dict]:
180+
async for item in self._api_client.iterate_items(
181+
offset=offset,
182+
limit=limit,
183+
clean=clean,
184+
desc=desc,
185+
fields=fields,
186+
omit=omit,
187+
unwind=unwind,
188+
skip_empty=skip_empty,
189+
skip_hidden=skip_hidden,
190+
):
191+
yield item
192+
193+
await self._update_metadata()
194+
195+
async def _update_metadata(self) -> None:
196+
"""Update the dataset metadata file with current information."""
197+
metadata = await self._api_client.get()
198+
self._metadata = DatasetMetadata.model_validate(metadata)

0 commit comments

Comments
 (0)