diff --git a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py index c85687d97..29a545df7 100644 --- a/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/databricks/external_table_dataset.py @@ -8,10 +8,7 @@ from typing import Any import pandas as pd -import pandas as pd -from kedro.io.core import ( - DatasetError -) +from kedro.io.core import DatasetError from pyspark.sql import DataFrame from kedro_datasets.databricks._base_table_dataset import BaseTable, BaseTableDataset @@ -26,7 +23,7 @@ class ExternalTable(BaseTable): def _validate_location(self) -> None: """Validates that a location is provided if the table does not exist. - + Raises: DatasetError: If the table does not exist and no location is provided. """ @@ -35,10 +32,10 @@ def _validate_location(self) -> None: "If the external table does not exists, the `location` parameter must be provided. " "This should be valid path in an external location that has already been created." ) - + def _validate_write_mode(self) -> None: """Validates that the write mode is compatible with the format. - + Raises: DatasetError: If the write mode is not compatible with the format. """ @@ -49,7 +46,7 @@ def _validate_write_mode(self) -> None: f"Format '{self.format}' is not supported for upserts. " f"Please use 'delta' format." ) - + if self.write_mode == "overwrite" and self.format != "delta" and not self.location: raise DatasetError( f"Format '{self.format}' is supported for overwrites only if the location is provided. " @@ -150,7 +147,7 @@ def _create_table( # noqa: PLR0913 primary_key=primary_key, format=format ) - + def _save_overwrite(self, data: DataFrame) -> None: """Overwrites the data in the table with the data provided. Args: @@ -159,7 +156,7 @@ def _save_overwrite(self, data: DataFrame) -> None: writer = data.write.format(self._table.format).mode("overwrite").option( "overwriteSchema", "true" ) - + if self._table.partition_columns: writer.partitionBy( *self._table.partition_columns if isinstance(self._table.partition_columns, list) else self._table.partition_columns