Skip to content

refactor!: Introduce new storage client system #1194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

vdusek
Copy link
Collaborator

@vdusek vdusek commented May 10, 2025

Description

  • I consolidated all commits from refactor!: Introduce new storage clients [WIP] #1107 into this new PR.
  • The previous storage-clients implementation was completely replaced with a redesigned clients, including:
  • The old "memory plus persist" client has been split into separate memory and file-system implementations.
    • The Configuration.persist_storage and Configuration.persist_metadata options were removed.
  • All old collection clients have been removed, they're no longer needed.
  • Each storage client now prints warnings if you pass method arguments it does not support.
  • The creation management modules in the storage clients and storages were removed.
  • Storage client parameters (e.g. purge_on_start, or token and base_api_url for the Apify client) are configured via the Configuration.
  • Every storage, and its corresponding client, now provides both a purge method (which clears all items but preserves the storage and metadata) and a drop method (which removes the entire storage, metadata included).
  • All unused types, models, and helper utilities have been removed.
  • The detailed, per-storage/client changes are listed below...

Dataset

  • Properties:
    • id
    • name
    • metadata
  • Methods:
    • open
    • purge (new method)
    • drop
    • push_data
    • get_data
    • iterate_items
    • list_items (new method)
    • export_to
  • Breaking changes:
    • from_storage_object method has been removed - Use the open method with name or id instead.
    • get_info -> metadata property
    • storage_object -> metadata property
    • set_metadata method has been removed (it wasn't propage to clients)
      • Do we want to support it (e.g. for renaming)?
    • write_to_json -> method has been removed, use export_to instead
    • write_to_csv -> method has been removed, use export_to instead
import asyncio

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import Dataset


async def main() -> None:
    dataset = await Dataset.open(storage_client=FileSystemStorageClient())
    print(f'default dataset - ID: {dataset.id}, name: {dataset.name}')

    await dataset.push_data({'name': 'John'})
    await dataset.push_data({'name': 'John', 'age': 20})
    await dataset.push_data({})

    dataset_with_name = await Dataset.open(
        name='my_dataset',
        storage_client=FileSystemStorageClient(),
    )
    print(f'named dataset - ID: {dataset_with_name.id}, name: {dataset_with_name.name}')

    await dataset_with_name.push_data([{'age': 30}, {'age': 25}])

    print('Default dataset items:')
    async for item in dataset.iterate_items(skip_empty=True):
        print(item)

    print('Named dataset items:')
    async for item in dataset_with_name.iterate_items():
        print(item)

    items = await dataset.get_data()
    print(items)

    dataset_by_id = await Dataset.open(id=dataset_with_name.id)
    print(f'dataset by ID - ID: {dataset_by_id.id}, name: {dataset_by_id.name}')


if __name__ == '__main__':
    asyncio.run(main())

Key-value store

  • Properties:
    • id
    • name
    • metadata
  • Methods:
    • open
    • purge (new method)
    • drop
    • get_value
    • set_value
    • delete_value (new method, Apify platform's set_value support setting an empty value to a key, so having a separate method for deleting is useful)
    • iterate_keys
    • list_keys (new method)
    • get_public_url
    • get_auto_saved_value
    • persist_autosaved_values
  • Breaking changes:
    • from_storage_object method has been removed - Use the open method with name or id instead.
    • get_info -> metadata property
    • storage_object -> metadata property
    • set_metadata method has been removed (it wasn't propage to clients)
      • Do we want to support it (e.g. for renaming)?
import asyncio

import requests

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import KeyValueStore


async def main() -> None:
    print('Opening key-value store "my_kvs"...')
    storage_client = FileSystemStorageClient()
    kvs = await KeyValueStore.open(name='my_kvs', storage_client=storage_client)

    print('Setting value to "file.json"...')
    await kvs.set_value('file.json', {'key': 'value'})

    print('Setting value to "file.jpg"...')
    response = requests.get('https://avatars.githubusercontent.com/u/25082181')
    await kvs.set_value('file.jpg', response.content)

    print('Iterating over keys:')
    async for key in kvs.iterate_keys():
        print(f'Key: {key}')

    print('Listing keys:')
    keys = [key.key for key in await kvs.list_keys()]
    print(f'Keys: {keys}')

    for key in keys:
        print(f'Getting value of {key}...')
        value = await kvs.get_value(key)
        print(f'Value: {str(value)[:100]}')

    print('Deleting value of "file.json"...')
    await kvs.delete_value('file.json')

    kvs_default = await KeyValueStore.open(storage_client=storage_client)

    special_key = 'key with spaces/and/slashes!@#$%^&*()'
    test_value = 'Special key value'

    await kvs_default.set_value(key=special_key, value=test_value)

    record = await kvs_default.get_value(key=special_key)
    assert record is not None
    assert record == test_value

    result = await kvs_default.list_keys()
    print(f'kvs_default list keys = {result}')

    kvs_2 = await KeyValueStore.open()
    result = await kvs_2.list_keys()
    print(f'kvs_2 list keys = {result}')


if __name__ == '__main__':
    asyncio.run(main())

Request queue

  • Properties:
    • id
    • name
    • metadata
  • Methods:
    • open
    • purge (new method)
    • drop
    • add_request
    • add_requests_batched -> add_requests
    • fetch_next_request
    • get_request
    • mark_request_as_handled
    • reclaim_request
    • is_empty
    • is_finished
  • Breaking changes:
    • from_storage_object method has been removed - Use the open method with name or id instead.
    • get_info -> metadata property
    • storage_object -> metadata property
    • set_metadata method has been removed (it wasn't propage to clients)
      • Do we want to support it (e.g. for renaming)?
    • get_handled_count method had been removed - Use metadata.handled_request_count instead.
    • get_total_count method has been removed - Use metadata.total_request_count instead.
    • resource_directory from the RequestQueueMetadata was removed, use path_to... property instead.
    • RequestQueueHead model has been removed - Use RequestQueueHeadWithLocks instead.
  • Notes:
    • New RQ add_requests contain forefront arg (Apify API supports it)
import asyncio

from crawlee import Request
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import RequestQueue


async def main() -> None:
    rq = await RequestQueue.open(
        name='my-queue',
        storage_client=FileSystemStorageClient(),
        configuration=Configuration(purge_on_start=True),
    )

    print(f'RequestQueue: {rq}')
    print(f'RequestQueue client: {rq._client}')

    await rq.add_requests(
        requests=[
            Request.from_url('https://example.com', use_extended_unique_key=True),
            Request.from_url('https://crawlee.dev', use_extended_unique_key=True),
            Request.from_url('https://apify.com', use_extended_unique_key=True),
        ],
    )

    print('Requests were added to the queue')

    is_empty = await rq.is_empty()
    is_finished = await rq.is_finished()

    print(f'Is empty: {is_empty}')
    print(f'Is finished: {is_finished}')

    request = await rq.fetch_next_request()
    print(f'Fetched request: {request}')

    await rq.add_request('https://facebook.com', forefront=True)

    request = await rq.fetch_next_request()
    print(f'Fetched request: {request}')

    rq_default = await RequestQueue.open(
        storage_client=FileSystemStorageClient(),
        configuration=Configuration(purge_on_start=True),
    )

    await rq_default.add_request('https://example.com/1')
    await rq_default.add_requests(
        [
            'https://example.com/priority-1',
            'https://example.com/priority-2',
            'https://example.com/priority-3',
        ]
    )
    await rq_default.add_request('https://example.com/2')


if __name__ == '__main__':
    asyncio.run(main())

BaseDatasetClient

  • Properties:
    • metadata
  • Methods:
    • open
    • purge
    • drop
    • push_data
    • get_data
    • iterate_items

BaseKeyValueStoreClient

  • Properties:
    • metadata
  • Methods:
    • open
    • purge
    • drop
    • get_value
    • set_value
    • delete_value
    • iterate_keys
    • get_public_url

BaseRequestQueueClient

  • Properties:
    • metadata
  • Methods:
    • open
    • purge
    • drop
    • add_requests_batch -> add_batch_of_requests (one backend method for 2 frontend methods)
    • get_request
    • fetch_next_request
    • mark_request_as_handled
    • reclaim_request
    • is_empty
  • Models
    • RequestQueueHeadWithLocks -> RequestQueueHead
    • BatchRequestsOperationResponse -> AddRequestsResponse
  • Notes:
    • Old file system (memory) version didn't persist the in-progress requests
    • Old file system (memory) version didn't persist the forefront values (now there is a FS-specific _sequence field in the FS Request)
    • The methods manipulating locks and listing heads are now only internal methods of Apify RQ client.

Issues

Testing

  • The original tests were mostly removed and replaced with a new ones.
  • Each storage-client implementation now has its own dedicated tests at the client level (more targeted/edge-case coverage).
  • On top of that, there are storage-level tests that use a parametrized fixture for each storage client (file-system and memory), ensuring every storage test runs against every client implementation.

Checklist

  • CI passed

@vdusek vdusek added this to the 114th sprint - Tooling team milestone May 10, 2025
@vdusek vdusek self-assigned this May 10, 2025
@vdusek vdusek added the t-tooling Issues with this label are in the ownership of the tooling team. label May 10, 2025
@vdusek vdusek force-pushed the new-storage-clients branch from 8758ca9 to 6b7b8bd Compare May 10, 2025 15:50
Copy link
Collaborator

@Pijukatel Pijukatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publishing some comments, not finished with reviewing the whole change yet.

@Pijukatel Pijukatel self-requested a review May 14, 2025 08:15
Copy link
Collaborator

@Pijukatel Pijukatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a loooot of work. Very nice!

@vdusek vdusek requested a review from Pijukatel May 15, 2025 10:29
Copy link
Collaborator

@Pijukatel Pijukatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good. Just please add section to the upgrading_to_v0x.md to summarize all the breaking changes in this.

@Mantisus
Copy link
Collaborator

That's excellent work!

Copy link
Collaborator

@janbuchar janbuchar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot. I do need to revisit the request queue related code though, it feels like we're throwing out the baby with the bathwater.

@vdusek vdusek requested a review from Copilot May 19, 2025 06:58
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the storage client system by removing legacy implementations and utilities, consolidating configuration, and updating documentation and examples to use the new clients.

  • Consolidated storage-related settings in Configuration and removed deprecated options.
  • Replaced legacy file utilities with infer_mime_type, atomic_write, and export-to-stream functions.
  • Updated service locator to default to FileSystemStorageClient and revised examples to use new storage clients.

Reviewed Changes

Copilot reviewed 92 out of 92 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/crawlee/configuration.py Simplified storage configuration fields to align with new clients.
src/crawlee/_utils/file.py Removed old file helpers; added atomic writes, MIME inference, and stream exports.
src/crawlee/_service_locator.py Changed default storage client to FileSystemStorageClient.
docs/deployment/code_examples/google/google_example.py Updated cloud function example to use MemoryStorageClient.
docs/guides/request_loaders.mdx Documentation updated to reflect handled_count and total_count properties.
Comments suppressed due to low confidence (1)

docs/deployment/code_examples/google/google_example.py:19

  • The example uses timedelta but does not import it. Add from datetime import timedelta at the top of the file to avoid a NameError.
request_handler_timeout=timedelta(seconds=30),

@vdusek vdusek force-pushed the new-storage-clients branch 4 times, most recently from d7b19ee to 7f2e6b0 Compare June 6, 2025 13:15
@vdusek vdusek force-pushed the new-storage-clients branch from 9bad9db to 65a1361 Compare June 9, 2025 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t-tooling Issues with this label are in the ownership of the tooling team.
Projects
None yet
4 participants