Skip to content

Commit

Permalink
chore(datasets): Fix delta + incremental dataset docstrings (#489)
Browse files Browse the repository at this point in the history
Signed-off-by: Merel Theisen <[email protected]>
  • Loading branch information
merelcht authored Dec 20, 2023
1 parent 6997b11 commit 124c6c9
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 28 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ dataset-doctest%:
exit 2; \
fi; \
\
# TODO(deepyaman): Fix as many doctests as possible (so that they run).
# The ignored datasets below require complicated setup with cloud/database clients which is overkill for the doctest examples.
cd kedro-datasets && pytest kedro_datasets --doctest-modules --doctest-continue-on-failure --no-cov \
--ignore kedro_datasets/databricks/managed_table_dataset.py \
--ignore kedro_datasets/pandas/gbq_dataset.py \
--ignore kedro_datasets/partitions/incremental_dataset.py \
--ignore kedro_datasets/partitions/partitioned_dataset.py \
--ignore kedro_datasets/redis/redis_dataset.py \
--ignore kedro_datasets/snowflake/snowpark_dataset.py \
--ignore kedro_datasets/spark/deltatable_dataset.py \
--ignore kedro_datasets/spark/spark_hive_dataset.py \
--ignore kedro_datasets/spark/spark_jdbc_dataset.py \
$(extra_pytest_arg${*})
Expand Down
2 changes: 1 addition & 1 deletion kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__( # noqa: PLR0913
method: The method of the request. GET, POST, PUT are the only supported
methods
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
https://requests.readthedocs.io/en/latest/api.html#requests.request
save_args: Options for saving data on server. Includes all parameters used
during load method. Adds an optional parameter, ``chunk_size`` which
determines the size of the package sent at each request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,19 @@ class ManagedTableDataset(AbstractVersionedDataset):
>>> from kedro_datasets.databricks import ManagedTableDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import IntegerType, StringType, StructField, StructType
>>> from pyspark.sql.types import IntegerType, Row, StringType, StructField, StructType
>>> import importlib_metadata
>>>
>>> DELTA_VERSION = importlib_metadata.version("delta-spark")
>>> schema = StructType(
... [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
... )
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>> spark_df = SparkSession.builder.config("spark.jars.packages", f"io.delta:delta-core_2.12:{DELTA_VERSION}").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog",).getOrCreate().createDataFrame(data, schema)
>>> dataset = ManagedTableDataset(table="names_and_ages", write_mode="overwrite")
>>> dataset.save(spark_df)
>>> reloaded = dataset.load()
>>> reloaded.take(4)
>>> assert Row(name="Bob", age=12) in reloaded.take(4)
"""

# this dataset cannot be used with ``ParallelRunner``,
Expand Down
12 changes: 1 addition & 11 deletions kedro-datasets/kedro_datasets/partitions/incremental_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,7 @@ class IncrementalDataset(PartitionedDataset):
>>> from kedro_datasets.partitions import IncrementalDataset
>>>
>>> # these credentials will be passed to:
>>> # a) 'fsspec.filesystem()' call,
>>> # b) the dataset initializer,
>>> # c) the checkpoint initializer
>>> credentials = {"key1": "secret1", "key2": "secret2"}
>>>
>>> dataset = IncrementalDataset(
... path="s3://bucket-name/path/to/folder",
... dataset="pandas.CSVDataset",
... credentials=credentials,
... )
>>> dataset = IncrementalDataset(path=str(tmp_path/ "test_data"), dataset="pandas.CSVDataset")
>>> loaded = dataset.load() # loads all available partitions
>>> # assert isinstance(loaded, dict)
>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
... for day_of_month in df["DAY_OF_MONTH"]
... }
>>>
>>> # Save it as small paritions with DAY_OF_MONTH as the partition key
>>> # Save it as small partitions with DAY_OF_MONTH as the partition key
>>> dataset = PartitionedDataset(
... path=tmp_path / "df_with_partition",
... path=str(tmp_path / "df_with_partition"),
... dataset="pandas.CSVDataset",
... filename_suffix=".csv",
... )
Expand All @@ -105,7 +105,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[],
... partition_data = partition_load_func()
...
>>> # Add the processing logic for individual partition HERE
>>> print(partition_data)
>>> # print(partition_data)
You can also load multiple partitions from a remote storage and combine them
like this:
Expand Down
9 changes: 5 additions & 4 deletions kedro-datasets/kedro_datasets/spark/deltatable_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DeltaTableDataset(AbstractDataset[None, DeltaTable]):
.. code-block:: pycon
>>> from delta import DeltaTable
>>> from kedro_datasets.spark import DeltaTableDataset, SparkDataset
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import StructField, StringType, IntegerType, StructType
Expand All @@ -50,13 +51,13 @@ class DeltaTableDataset(AbstractDataset[None, DeltaTable]):
>>> data = [("Alex", 31), ("Bob", 12), ("Clarke", 65), ("Dave", 29)]
>>>
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> dataset = SparkDataset(filepath=tmp_path / "test_data", file_format="delta")
>>> filepath = (tmp_path / "test_data").as_posix()
>>> dataset = SparkDataset(filepath=filepath, file_format="delta")
>>> dataset.save(spark_df)
>>> deltatable_dataset = DeltaTableDataset(filepath=tmp_path / "test_data")
>>> deltatable_dataset = DeltaTableDataset(filepath=filepath)
>>> delta_table = deltatable_dataset.load()
>>>
>>> delta_table.update()
>>> assert isinstance(delta_table, DeltaTable)
"""

# this dataset cannot be used with ``ParallelRunner``,
Expand Down
3 changes: 1 addition & 2 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ def _collect_requirements(requires):
"compress-pickle[lz4]~=2.1.0",
"coverage[toml]",
"dask[complete]~=2021.10", # pinned by Snyk to avoid a vulnerability
"delta-spark>=1.2.1; python_version >= '3.11'", # 1.2.0 has a bug that breaks some of our tests: https://github.com/delta-io/delta/issues/1070
"delta-spark~=1.2.1; python_version < '3.11'",
"delta-spark>=1.0, <3.0",
"deltalake>=0.10.0",
"dill~=0.3.1",
"filelock>=3.4.0, <4.0",
Expand Down

0 comments on commit 124c6c9

Please sign in to comment.