From 1a5e0faa7044dc73e7ee2be0fec38c8d2fa2a83c Mon Sep 17 00:00:00 2001 From: ElenaKhaustova <157851531+ElenaKhaustova@users.noreply.github.com> Date: Wed, 22 Jan 2025 12:28:30 +0000 Subject: [PATCH] fix(datasets): Add parameter to enable/disable lazy saving for `PartitionedDataset` (#978) * Replaced callable check Signed-off-by: Elena Khaustova * Updateds lazy_save test Signed-off-by: Elena Khaustova * Added test_callable_save Signed-off-by: Elena Khaustova * Fixed lint Signed-off-by: Elena Khaustova * Fixed docs links Signed-off-by: Elena Khaustova * Fixed all docs links Signed-off-by: Elena Khaustova * Updated release notes Signed-off-by: Elena Khaustova * Fixed all docs links Signed-off-by: Elena Khaustova * Fixed typo Signed-off-by: Elena Khaustova * Added argument to disable lazy saving Signed-off-by: Elena Khaustova * Removed save function argument Signed-off-by: Elena Khaustova * Updated unit test Signed-off-by: Elena Khaustova * Fixed lint Signed-off-by: Elena Khaustova * Updated related docs Signed-off-by: Elena Khaustova * Revert test changes Signed-off-by: Elena Khaustova * Updated baseline Signed-off-by: Elena Khaustova * Updated release notes Signed-off-by: Elena Khaustova * Updated release notes Signed-off-by: Elena Khaustova * Updated docstrings Signed-off-by: Elena Khaustova --------- Signed-off-by: Elena Khaustova --- .secrets.baseline | 14 +++++------ kedro-datasets/RELEASE.md | 1 + .../kedro_datasets/dask/csv_dataset.py | 5 ++-- .../kedro_datasets/dask/parquet_dataset.py | 5 ++-- .../partitions/partitioned_dataset.py | 10 +++++++- .../partitions/test_partitioned_dataset.py | 23 +++++++++++++++++++ 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index ce3799e06..c18f3f6f1 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -129,7 +129,7 @@ "filename": "kedro-datasets/kedro_datasets/dask/parquet_dataset.py", "hashed_secret": "6e1d66a1596528c308e601c10aa0b92d53606ab9", "is_verified": false, - "line_number": 71 + "line_number": 72 } ], "kedro-datasets/kedro_datasets/pandas/sql_dataset.py": [ @@ -340,35 +340,35 @@ "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "76f747de912e8682e29a23cb506dd5bf0de080d2", "is_verified": false, - "line_number": 415 + "line_number": 438 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "9027cc5a2c1321de60a2d71ccde6229d1152d6d3", "is_verified": false, - "line_number": 416 + "line_number": 439 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "5dcbdf371f181b9b7a41a4be7be70f8cbee67da7", "is_verified": false, - "line_number": 452 + "line_number": 475 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "727d8ff68b6b550f2cf6e737b3cad5149c65fe5b", "is_verified": false, - "line_number": 503 + "line_number": 526 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "adb5fabe51f5b45e83fdd91b71c92156fec4a63e", "is_verified": false, - "line_number": 523 + "line_number": 546 } ], "kedro-datasets/tests/plotly/test_html_dataset.py": [ @@ -490,5 +490,5 @@ } ] }, - "generated_at": "2025-01-13T16:27:46Z" + "generated_at": "2025-01-15T15:25:24Z" } diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 15c13da84..820388766 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,6 +1,7 @@ # Upcoming Release ## Major features and improvements +- Added a parameter to enable/disable lazy saving for `PartitionedDataset`. - Replaced `trufflehog` with `detect-secrets` for detecting secrets within a code base. ## Bug fixes and other changes diff --git a/kedro-datasets/kedro_datasets/dask/csv_dataset.py b/kedro-datasets/kedro_datasets/dask/csv_dataset.py index bc5b5764b..b82bff15e 100644 --- a/kedro-datasets/kedro_datasets/dask/csv_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/csv_dataset.py @@ -1,5 +1,6 @@ """``CSVDataset`` is a dataset used to load and save data to CSV files using Dask dataframe""" + from __future__ import annotations from copy import deepcopy @@ -13,7 +14,7 @@ class CSVDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]): """``CSVDataset`` loads and saves data to comma-separated value file(s). It uses Dask remote data services to handle the corresponding load and save operations: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html Example usage for the `YAML API `_: @@ -73,7 +74,7 @@ def __init__( # noqa: PLR0913 credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ diff --git a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py index 3b2dff73e..b3a81c632 100644 --- a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py @@ -1,5 +1,6 @@ """``ParquetDataset`` is a dataset used to load and save data to parquet files using Dask dataframe""" + from __future__ import annotations from copy import deepcopy @@ -14,7 +15,7 @@ class ParquetDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]): """``ParquetDataset`` loads and saves data to parquet file(s). It uses Dask remote data services to handle the corresponding load and save operations: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html Example usage for the `YAML API `_: @@ -103,7 +104,7 @@ def __init__( # noqa: PLR0913 credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index ea2461034..cf1069b1a 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -69,6 +69,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[], sep: '\\t' index: true filename_suffix: '.dat' + save_lazily: True Example usage for the `Python API >> # This will create a folder `df_with_partition` and save multiple files >>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. @@ -152,6 +154,7 @@ def __init__( # noqa: PLR0913 load_args: dict[str, Any] | None = None, fs_args: dict[str, Any] | None = None, overwrite: bool = False, + save_lazily: bool = True, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new instance of ``PartitionedDataset``. @@ -191,6 +194,10 @@ def __init__( # noqa: PLR0913 fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. + save_lazily: Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object + is passed as data to save, the partition’s data will not be materialised until it is time to write. + Lazy saving example: + https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -206,6 +213,7 @@ def __init__( # noqa: PLR0913 self._overwrite = overwrite self._protocol = infer_storage_options(self._path)["protocol"] self._partition_cache: Cache = Cache(maxsize=1) + self._save_lazily = save_lazily self.metadata = metadata dataset = dataset if isinstance(dataset, dict) else {"type": dataset} @@ -311,7 +319,7 @@ def save(self, data: dict[str, Any]) -> None: # join the protocol back since tools like PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore - if callable(partition_data): + if callable(partition_data) and self._save_lazily: partition_data = partition_data() # noqa: PLW2901 dataset.save(partition_data) self._invalidate_caches() diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index f0126887d..9a49d3bb8 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -52,6 +52,10 @@ def filepath_csvs(tmp_path): ] +def original_data_callable(): + return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) + + class FakeDataset: # pylint: disable=too-few-public-methods pass @@ -101,6 +105,25 @@ def test_save(self, dataset, local_csvs, suffix): reloaded_data = loaded_partitions[part_id]() assert_frame_equal(reloaded_data, original_data) + @pytest.mark.parametrize("dataset", ["kedro_datasets.pickle.PickleDataset"]) + @pytest.mark.parametrize("suffix", ["", ".csv"]) + def test_callable_save(self, dataset, local_csvs, suffix): + pds = PartitionedDataset( + path=str(local_csvs), + dataset=dataset, + filename_suffix=suffix, + save_lazily=False, + ) + + part_id = "new/data" + pds.save({part_id: original_data_callable}) + + assert (local_csvs / "new" / ("data" + suffix)).is_file() + loaded_partitions = pds.load() + assert part_id in loaded_partitions + reloaded_data = loaded_partitions[part_id]() + assert reloaded_data == original_data_callable + @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) @pytest.mark.parametrize("suffix", ["", ".csv"]) def test_lazy_save(self, dataset, local_csvs, suffix):