Skip to content

Commit

Permalink
mypy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi8893 committed Dec 20, 2024
1 parent 89276f9 commit b80810d
Showing 1 changed file with 43 additions and 47 deletions.
90 changes: 43 additions & 47 deletions kedro-datasets/kedro_datasets/spark/spark_gbq_dataset.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
"""``AbstractDataset`` implementation to access Spark dataframes using
``pyspark``.
"""

from __future__ import annotations

import base64
import json
from typing import Dict, Any
import logging
from copy import deepcopy
from typing import Any, NoReturn

from kedro.io import AbstractDataset
import pandas as pd
from pyspark.sql import DataFrame
from kedro_datasets._utils.spark_utils import get_spark
from py4j.protocol import Py4JJavaError
from pyspark.sql import DataFrame


import pyspark.sql as sql
from copy import deepcopy
import logging
from kedro_datasets._utils.spark_utils import get_spark

logger = logging.getLogger(__name__)


class GBQQueryDataset(AbstractDataset[None, DataFrame]):
"""``GBQQueryDataset`` loads data from Google BigQuery with a SQL query using BigQuery Spark connector.
Expand All @@ -27,8 +28,8 @@ class GBQQueryDataset(AbstractDataset[None, DataFrame]):
.. code-block:: yaml
my_gbq_data:
type: spark.SparkGBQQueryDataset
my_gbq_spark_data:
type: spark.GBQQueryDataset
sql: |
SELECT * FROM your_table
materialization_dataset: your_dataset
Expand All @@ -40,34 +41,32 @@ class GBQQueryDataset(AbstractDataset[None, DataFrame]):
`Python API <https://docs.kedro.org/en/stable/data/\
advanced_data_catalog_usage.html>`_:
::
.. code-block:: pycon
>>> from kedro_datasets.spark import SparkGBQQueryDataset
>>> from kedro_datasets.spark import GBQQueryDataset
>>> import pyspark.sql as sql
>>>
>>> # Define your SQL query
>>> sql = "SELECT * FROM your_table"
>>>
>>> # Initialize dataset
>>> dataset = SparkGBQQueryDataset(
>>> sql=sql,
>>> materialization_dataset="your_dataset",
>>> materialization_project="your_project", # optional
>>> credentials=dict(file="/path/to/your/credentials.json"),
>>> )
>>> dataset = GBQQueryDataset(
... sql=sql,
... materialization_dataset="your_dataset",
... materialization_project="your_project", # optional
... credentials=dict(file="/path/to/your/credentials.json"),
... )
>>>
>>> # Load data
>>> df = dataset.load()
>>>
>>> # Example output
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1| Alice|
| 2| Bob|
+---+-----+
"""

_VALID_CREDENTIALS_KEYS = {"base64", "file", "json"}

def __init__(
def __init__( # noqa: PLR0913
self,
sql: str,
materialization_dataset: str,
Expand All @@ -81,7 +80,7 @@ def __init__(
Args:
sql: SQL query to execute.
materialization_dataset: The name of the dataset to materialize the query results.
materialization_project: The name of the project to materialize the query results.
materialization_project: The name of the project to materialize the query results.
Optional (defaults to the project id set by the credentials).
load_args: Load args passed to Spark DataFrameReader load method.
It is dependent on the selected file format. You can find
Expand All @@ -90,7 +89,7 @@ def __init__(
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
credentials: Credentials to authenticate spark session with google bigquery.
Dictionary with key specifying the type of credentials ('base64', 'file', 'json').
Read more here:
Read more here:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#how-do-i-authenticate-outside-gce--dataproc
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Expand All @@ -103,10 +102,10 @@ def __init__(
self._metadata = metadata

def _get_spark_credentials(self) -> dict[str, str]:

if len(self._credentials) > 1:
raise ValueError(
f"Please provide only one of 'base64', 'file' or 'json' key in the credentials. You provided: {list(self._credentials.keys())}"
"Please provide only one of 'base64', 'file' or 'json' key in the credentials. "
"You provided: {list(self._credentials.keys())}"
)
if self._credentials.get("base64"):
return {
Expand All @@ -117,13 +116,13 @@ def _get_spark_credentials(self) -> dict[str, str]:
"credentialsFile": self._credentials["file"],
}
if self._credentials.get("json"):
creds_b64 = base64.b64encode(json.dumps(self._credentials["json"]).encode("utf-8")).decode("utf-8")
return {
"credentials": creds_b64
}
creds_b64 = base64.b64encode(
json.dumps(self._credentials["json"]).encode("utf-8")
).decode("utf-8")
return {"credentials": creds_b64}

return {}

def _get_spark_load_args(self) -> dict[str, Any]:
spark_load_args = deepcopy(self._load_args)
spark_load_args["query"] = self._sql
Expand All @@ -145,14 +144,12 @@ def _get_spark_load_args(self) -> dict[str, Any]:
"The 'viewsEnabled' configuration is not set to 'true' in the SparkSession. "
"This is required for the Spark BigQuery connector to read via a SQL query. "
"Setting 'viewsEnabled' to 'true' for the current query read operation. "
"This may incur additional costs!"
"This may incur additional costs!"
)

return spark_load_args



def load(self) -> sql.DataFrame:
def load(self) -> DataFrame:
"""Loads data from Google BigQuery.
Returns:
Expand All @@ -163,17 +160,16 @@ def load(self) -> sql.DataFrame:

return read_obj.load(**self._get_spark_load_args())

def save(self, data: sql.DataFrame) -> None:
raise NotImplementedError("Save method is not implemented for SparkGBQQueryDataset")

def _exists(self) -> None:
raise NotImplementedError("Exists method is not implemented for SparkGBQQueryDataset")

def _describe(self) -> Dict[str, Any]:
def save(self, data: None) -> NoReturn:
raise NotImplementedError(
"Save method is not implemented for SparkGBQQueryDataset"
)

def _describe(self) -> dict[str, Any]:
return {
"sql": self._sql,
"materialization_dataset": self._materialization_dataset,
"materialization_project": self._materialization_project,
"load_args": self._load_args,
"metadata": self._metadata,
}
}

0 comments on commit b80810d

Please sign in to comment.