Skip to content

Commit

Permalink
fix(datasets): Add parameter to enable/disable lazy saving for `Parti…
Browse files Browse the repository at this point in the history
…tionedDataset` (#978)

* Replaced callable check

Signed-off-by: Elena Khaustova <[email protected]>

* Updateds lazy_save test

Signed-off-by: Elena Khaustova <[email protected]>

* Added test_callable_save

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed lint

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed docs links

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed all docs links

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed all docs links

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed typo

Signed-off-by: Elena Khaustova <[email protected]>

* Added argument to disable lazy saving

Signed-off-by: Elena Khaustova <[email protected]>

* Removed save function argument

Signed-off-by: Elena Khaustova <[email protected]>

* Updated unit test

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed lint

Signed-off-by: Elena Khaustova <[email protected]>

* Updated related docs

Signed-off-by: Elena Khaustova <[email protected]>

* Revert test changes

Signed-off-by: Elena Khaustova <[email protected]>

* Updated baseline

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

* Updated release notes

Signed-off-by: Elena Khaustova <[email protected]>

* Updated docstrings

Signed-off-by: Elena Khaustova <[email protected]>

---------

Signed-off-by: Elena Khaustova <[email protected]>
  • Loading branch information
ElenaKhaustova authored Jan 22, 2025
1 parent 2dc8f16 commit 1a5e0fa
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
14 changes: 7 additions & 7 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"filename": "kedro-datasets/kedro_datasets/dask/parquet_dataset.py",
"hashed_secret": "6e1d66a1596528c308e601c10aa0b92d53606ab9",
"is_verified": false,
"line_number": 71
"line_number": 72
}
],
"kedro-datasets/kedro_datasets/pandas/sql_dataset.py": [
Expand Down Expand Up @@ -340,35 +340,35 @@
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "76f747de912e8682e29a23cb506dd5bf0de080d2",
"is_verified": false,
"line_number": 415
"line_number": 438
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "9027cc5a2c1321de60a2d71ccde6229d1152d6d3",
"is_verified": false,
"line_number": 416
"line_number": 439
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "5dcbdf371f181b9b7a41a4be7be70f8cbee67da7",
"is_verified": false,
"line_number": 452
"line_number": 475
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "727d8ff68b6b550f2cf6e737b3cad5149c65fe5b",
"is_verified": false,
"line_number": 503
"line_number": 526
},
{
"type": "Secret Keyword",
"filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py",
"hashed_secret": "adb5fabe51f5b45e83fdd91b71c92156fec4a63e",
"is_verified": false,
"line_number": 523
"line_number": 546
}
],
"kedro-datasets/tests/plotly/test_html_dataset.py": [
Expand Down Expand Up @@ -490,5 +490,5 @@
}
]
},
"generated_at": "2025-01-13T16:27:46Z"
"generated_at": "2025-01-15T15:25:24Z"
}
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release
## Major features and improvements

- Added a parameter to enable/disable lazy saving for `PartitionedDataset`.
- Replaced `trufflehog` with `detect-secrets` for detecting secrets within a code base.

## Bug fixes and other changes
Expand Down
5 changes: 3 additions & 2 deletions kedro-datasets/kedro_datasets/dask/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""``CSVDataset`` is a dataset used to load and save data to CSV files using Dask
dataframe"""

from __future__ import annotations

from copy import deepcopy
Expand All @@ -13,7 +14,7 @@
class CSVDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]):
"""``CSVDataset`` loads and saves data to comma-separated value file(s). It uses Dask
remote data services to handle the corresponding load and save operations:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html
Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__( # noqa: PLR0913
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Optional parameters to the backend file system driver:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
Expand Down
5 changes: 3 additions & 2 deletions kedro-datasets/kedro_datasets/dask/parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""``ParquetDataset`` is a dataset used to load and save data to parquet files using Dask
dataframe"""

from __future__ import annotations

from copy import deepcopy
Expand All @@ -14,7 +15,7 @@
class ParquetDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]):
"""``ParquetDataset`` loads and saves data to parquet file(s). It uses Dask
remote data services to handle the corresponding load and save operations:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html
Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__( # noqa: PLR0913
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Optional parameters to the backend file system driver:
https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters
https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
Expand Down
10 changes: 9 additions & 1 deletion kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
sep: '\\t'
index: true
filename_suffix: '.dat'
save_lazily: True
Example usage for the
`Python API <https://docs.kedro.org/en/stable/data/\
Expand All @@ -93,6 +94,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
... path=str(tmp_path / "df_with_partition"),
... dataset="pandas.CSVDataset",
... filename_suffix=".csv",
... save_lazily=False
... )
>>> # This will create a folder `df_with_partition` and save multiple files
>>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc.
Expand Down Expand Up @@ -152,6 +154,7 @@ def __init__( # noqa: PLR0913
load_args: dict[str, Any] | None = None,
fs_args: dict[str, Any] | None = None,
overwrite: bool = False,
save_lazily: bool = True,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of ``PartitionedDataset``.
Expand Down Expand Up @@ -191,6 +194,10 @@ def __init__( # noqa: PLR0913
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
overwrite: If True, any existing partitions will be removed.
save_lazily: Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object
is passed as data to save, the partition’s data will not be materialised until it is time to write.
Lazy saving example:
https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Expand All @@ -206,6 +213,7 @@ def __init__( # noqa: PLR0913
self._overwrite = overwrite
self._protocol = infer_storage_options(self._path)["protocol"]
self._partition_cache: Cache = Cache(maxsize=1)
self._save_lazily = save_lazily
self.metadata = metadata

dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
Expand Down Expand Up @@ -311,7 +319,7 @@ def save(self, data: dict[str, Any]) -> None:
# join the protocol back since tools like PySpark may rely on it
kwargs[self._filepath_arg] = self._join_protocol(partition)
dataset = self._dataset_type(**kwargs) # type: ignore
if callable(partition_data):
if callable(partition_data) and self._save_lazily:
partition_data = partition_data() # noqa: PLW2901
dataset.save(partition_data)
self._invalidate_caches()
Expand Down
23 changes: 23 additions & 0 deletions kedro-datasets/tests/partitions/test_partitioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def filepath_csvs(tmp_path):
]


def original_data_callable():
return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]})


class FakeDataset: # pylint: disable=too-few-public-methods
pass

Expand Down Expand Up @@ -101,6 +105,25 @@ def test_save(self, dataset, local_csvs, suffix):
reloaded_data = loaded_partitions[part_id]()
assert_frame_equal(reloaded_data, original_data)

@pytest.mark.parametrize("dataset", ["kedro_datasets.pickle.PickleDataset"])
@pytest.mark.parametrize("suffix", ["", ".csv"])
def test_callable_save(self, dataset, local_csvs, suffix):
pds = PartitionedDataset(
path=str(local_csvs),
dataset=dataset,
filename_suffix=suffix,
save_lazily=False,
)

part_id = "new/data"
pds.save({part_id: original_data_callable})

assert (local_csvs / "new" / ("data" + suffix)).is_file()
loaded_partitions = pds.load()
assert part_id in loaded_partitions
reloaded_data = loaded_partitions[part_id]()
assert reloaded_data == original_data_callable

@pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION)
@pytest.mark.parametrize("suffix", ["", ".csv"])
def test_lazy_save(self, dataset, local_csvs, suffix):
Expand Down

0 comments on commit 1a5e0fa

Please sign in to comment.