Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datasets): Add parameter to enable/disable lazy saving for PartitionedDataset #978

Merged
merged 21 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"filename": "kedro-datasets/kedro_datasets/dask/parquet_dataset.py",
"hashed_secret": "6e1d66a1596528c308e601c10aa0b92d53606ab9",
"is_verified": false,
"line_number": 71
"line_number": 72
}
],
"kedro-datasets/kedro_datasets/pandas/sql_dataset.py": [
Expand Down Expand Up @@ -340,35 +340,35 @@
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "76f747de912e8682e29a23cb506dd5bf0de080d2",
"is_verified": false,
"line_number": 415
"line_number": 438
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "9027cc5a2c1321de60a2d71ccde6229d1152d6d3",
"is_verified": false,
"line_number": 416
"line_number": 439
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "5dcbdf371f181b9b7a41a4be7be70f8cbee67da7",
"is_verified": false,
"line_number": 452
"line_number": 475
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "727d8ff68b6b550f2cf6e737b3cad5149c65fe5b",
"is_verified": false,
"line_number": 503
"line_number": 526
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "adb5fabe51f5b45e83fdd91b71c92156fec4a63e",
"is_verified": false,
"line_number": 523
"line_number": 546
}
],
"kedro-datasets/tests/plotly/test_html_dataset.py": [
Expand Down Expand Up @@ -490,5 +490,5 @@
}
]
},
"generated_at": "2025-01-13T16:27:46Z"
"generated_at": "2025-01-15T15:25:24Z"
}
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release
## Major features and improvements

- Added a parameter to enable/disable lazy saving for `PartitionedDataset`.
- Replaced `trufflehog` with `detect-secrets` for detecting secrets within a code base.

## Bug fixes and other changes
Expand Down
5 changes: 3 additions & 2 deletions kedro-datasets/kedro_datasets/dask/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""``CSVDataset`` is a dataset used to load and save data to CSV files using Dask
dataframe"""

from __future__ import annotations

from copy import deepcopy
Expand All @@ -13,7 +14,7 @@
class CSVDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]):
"""``CSVDataset`` loads and saves data to comma-separated value file(s). It uses Dask
remote data services to handle the corresponding load and save operations:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html

Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__( # noqa: PLR0913
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Optional parameters to the backend file system driver:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
Expand Down
5 changes: 3 additions & 2 deletions kedro-datasets/kedro_datasets/dask/parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""``ParquetDataset`` is a dataset used to load and save data to parquet files using Dask
dataframe"""

from __future__ import annotations

from copy import deepcopy
Expand All @@ -14,7 +15,7 @@
class ParquetDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]):
"""``ParquetDataset`` loads and saves data to parquet file(s). It uses Dask
remote data services to handle the corresponding load and save operations:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html

Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__( # noqa: PLR0913
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Optional parameters to the backend file system driver:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
Expand Down
10 changes: 9 additions & 1 deletion kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
sep: '\\t'
index: true
filename_suffix: '.dat'
save_lazily: True

Example usage for the
`Python API <https://docs.kedro.org/en/stable/data/\
Expand All @@ -93,6 +94,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
... path=str(tmp_path / "df_with_partition"),
... dataset="pandas.CSVDataset",
... filename_suffix=".csv",
... save_lazily=False
... )
>>> # This will create a folder `df_with_partition` and save multiple files
>>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc.
Expand Down Expand Up @@ -152,6 +154,7 @@ def __init__( # noqa: PLR0913
load_args: dict[str, Any] | None = None,
fs_args: dict[str, Any] | None = None,
overwrite: bool = False,
save_lazily: bool = True,
Galileo-Galilei marked this conversation as resolved.
Show resolved Hide resolved
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of ``PartitionedDataset``.
Expand Down Expand Up @@ -191,6 +194,10 @@ def __init__( # noqa: PLR0913
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
overwrite: If True, any existing partitions will be removed.
save_lazily: Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object
is passed as data to save, the partition’s data will not be materialised until it is time to write.
Lazy saving example:
https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.

Expand All @@ -206,6 +213,7 @@ def __init__( # noqa: PLR0913
self._overwrite = overwrite
self._protocol = infer_storage_options(self._path)["protocol"]
self._partition_cache: Cache = Cache(maxsize=1)
self._save_lazily = save_lazily
self.metadata = metadata

dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
Expand Down Expand Up @@ -311,7 +319,7 @@ def save(self, data: dict[str, Any]) -> None:
# join the protocol back since tools like PySpark may rely on it
kwargs[self._filepath_arg] = self._join_protocol(partition)
dataset = self._dataset_type(**kwargs) # type: ignore
if callable(partition_data):
if callable(partition_data) and self._save_lazily:
partition_data = partition_data() # noqa: PLW2901
dataset.save(partition_data)
self._invalidate_caches()
Expand Down
23 changes: 23 additions & 0 deletions kedro-datasets/tests/partitions/test_partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def filepath_csvs(tmp_path):
]


def original_data_callable():
return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]})


class FakeDataset: # pylint: disable=too-few-public-methods
pass

Expand Down Expand Up @@ -101,6 +105,25 @@ def test_save(self, dataset, local_csvs, suffix):
reloaded_data = loaded_partitions[part_id]()
assert_frame_equal(reloaded_data, original_data)

@pytest.mark.parametrize("dataset", ["kedro_datasets.pickle.PickleDataset"])
@pytest.mark.parametrize("suffix", ["", ".csv"])
def test_callable_save(self, dataset, local_csvs, suffix):
pds = PartitionedDataset(
path=str(local_csvs),
dataset=dataset,
filename_suffix=suffix,
save_lazily=False,
)

part_id = "new/data"
pds.save({part_id: original_data_callable})

assert (local_csvs / "new" / ("data" + suffix)).is_file()
loaded_partitions = pds.load()
assert part_id in loaded_partitions
reloaded_data = loaded_partitions[part_id]()
assert reloaded_data == original_data_callable

@pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION)
@pytest.mark.parametrize("suffix", ["", ".csv"])
def test_lazy_save(self, dataset, local_csvs, suffix):
Expand Down
Loading