diff --git a/docs/changelog/next_release/279.feature.rst b/docs/changelog/next_release/279.feature.rst new file mode 100644 index 00000000..e7bddd68 --- /dev/null +++ b/docs/changelog/next_release/279.feature.rst @@ -0,0 +1,2 @@ +Added slots for data-rentgen plugin integration +-- by :github:user:`marashka` \ No newline at end of file diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index 61ec1403..4aaa9873 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -7,6 +7,7 @@ from etl_entities.hwm_store import BaseHWMStore from horizon.client.auth import LoginPassword from horizon_hwm_store import HorizonHWMStore +from onetl.hooks import slot, support_hooks from onetl.strategy import IncrementalStrategy from syncmaster.db.models import Connection, Run @@ -146,6 +147,7 @@ } +@support_hooks class TransferController: settings: WorkerAppSettings source_handler: Handler @@ -187,6 +189,7 @@ def __init__( temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"), ) + @slot def perform_transfer(self) -> None: try: spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION( diff --git a/syncmaster/worker/handlers/db/clickhouse.py b/syncmaster/worker/handlers/db/clickhouse.py index 3e36e978..ec6e2e58 100644 --- a/syncmaster/worker/handlers/db/clickhouse.py +++ b/syncmaster/worker/handlers/db/clickhouse.py @@ -7,6 +7,7 @@ from onetl.connection import Clickhouse from onetl.db import DBWriter +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import ClickhouseConnectionDTO from syncmaster.dto.transfers import ClickhouseTransferDTO @@ -17,6 +18,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class ClickhouseHandler(DBHandler): connection: Clickhouse connection_dto: ClickhouseConnectionDTO @@ -40,6 +42,11 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot + def read(self) -> DataFrame: + return super().read() + + @slot def write(self, df: DataFrame) -> None: normalized_df = self._normalize_column_names(df) sort_column = next( diff --git a/syncmaster/worker/handlers/db/hive.py b/syncmaster/worker/handlers/db/hive.py index 0ae7464c..98f3d567 100644 --- a/syncmaster/worker/handlers/db/hive.py +++ b/syncmaster/worker/handlers/db/hive.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.connection import Hive +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import HiveConnectionDTO from syncmaster.dto.transfers import HiveTransferDTO @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class HiveHandler(DBHandler): connection: Hive connection_dto: HiveConnectionDTO @@ -31,10 +33,15 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot def read(self) -> DataFrame: self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name) return super().read() + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) + def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/mssql.py b/syncmaster/worker/handlers/db/mssql.py index a2ddb9d2..d1b043f8 100644 --- a/syncmaster/worker/handlers/db/mssql.py +++ b/syncmaster/worker/handlers/db/mssql.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.connection import MSSQL +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import MSSQLConnectionDTO from syncmaster.dto.transfers import MSSQLTransferDTO @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class MSSQLHandler(DBHandler): connection: MSSQL connection_dto: MSSQLConnectionDTO @@ -38,6 +40,14 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) + def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/mysql.py b/syncmaster/worker/handlers/db/mysql.py index c991780b..49009abb 100644 --- a/syncmaster/worker/handlers/db/mysql.py +++ b/syncmaster/worker/handlers/db/mysql.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.connection import MySQL +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import MySQLConnectionDTO from syncmaster.dto.transfers import MySQLTransferDTO @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class MySQLHandler(DBHandler): connection: MySQL connection_dto: MySQLConnectionDTO @@ -35,6 +37,14 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) + def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/oracle.py b/syncmaster/worker/handlers/db/oracle.py index a65af99b..c42e03bc 100644 --- a/syncmaster/worker/handlers/db/oracle.py +++ b/syncmaster/worker/handlers/db/oracle.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.connection import Oracle +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import OracleConnectionDTO from syncmaster.dto.transfers import OracleTransferDTO @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class OracleHandler(DBHandler): connection: Oracle connection_dto: OracleConnectionDTO @@ -37,6 +39,14 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) + def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.upper()) diff --git a/syncmaster/worker/handlers/db/postgres.py b/syncmaster/worker/handlers/db/postgres.py index eadfcba1..0d4b4506 100644 --- a/syncmaster/worker/handlers/db/postgres.py +++ b/syncmaster/worker/handlers/db/postgres.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.connection import Postgres +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import PostgresConnectionDTO from syncmaster.dto.transfers import PostgresTransferDTO @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class PostgresHandler(DBHandler): connection: Postgres connection_dto: PostgresConnectionDTO @@ -36,6 +38,14 @@ def connect(self, spark: SparkSession): spark=spark, ).check() + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) + def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/file/ftp.py b/syncmaster/worker/handlers/file/ftp.py index e278b749..2738231e 100644 --- a/syncmaster/worker/handlers/file/ftp.py +++ b/syncmaster/worker/handlers/file/ftp.py @@ -5,13 +5,16 @@ from typing import TYPE_CHECKING +from onetl.hooks import slot, support_hooks + from syncmaster.dto.connections import FTPConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class FTPHandler(LocalDFFileHandler): connection_dto: FTPConnectionDTO @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/ftps.py b/syncmaster/worker/handlers/file/ftps.py index 60ae292d..46e5640f 100644 --- a/syncmaster/worker/handlers/file/ftps.py +++ b/syncmaster/worker/handlers/file/ftps.py @@ -5,13 +5,16 @@ from typing import TYPE_CHECKING +from onetl.hooks import slot, support_hooks + from syncmaster.dto.connections import FTPSConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class FTPSHandler(LocalDFFileHandler): connection_dto: FTPSConnectionDTO @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/hdfs.py b/syncmaster/worker/handlers/file/hdfs.py index bcf69837..98e80c8f 100644 --- a/syncmaster/worker/handlers/file/hdfs.py +++ b/syncmaster/worker/handlers/file/hdfs.py @@ -5,13 +5,16 @@ from typing import TYPE_CHECKING +from onetl.hooks import slot, support_hooks + from syncmaster.dto.connections import HDFSConnectionDTO from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class HDFSHandler(RemoteDFFileHandler): connection_dto: HDFSConnectionDTO @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession): user=self.connection_dto.user, password=self.connection_dto.password, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/s3.py b/syncmaster/worker/handlers/file/s3.py index 45adb5af..ab248461 100644 --- a/syncmaster/worker/handlers/file/s3.py +++ b/syncmaster/worker/handlers/file/s3.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from onetl.file import FileDFReader +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import S3ConnectionDTO from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler @@ -14,6 +15,7 @@ from pyspark.sql import DataFrame, SparkSession +@support_hooks class S3Handler(RemoteDFFileHandler): connection_dto: S3ConnectionDTO @@ -42,6 +44,7 @@ def connect(self, spark: SparkSession): region=self.connection_dto.region, ).check() + @slot def read(self) -> DataFrame: from pyspark.sql.types import StructType @@ -67,3 +70,7 @@ def read(self) -> DataFrame: df = df.selectExpr(*columns_filter_expressions) return df + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/samba.py b/syncmaster/worker/handlers/file/samba.py index 89314dca..9a52825e 100644 --- a/syncmaster/worker/handlers/file/samba.py +++ b/syncmaster/worker/handlers/file/samba.py @@ -5,13 +5,16 @@ from typing import TYPE_CHECKING +from onetl.hooks import slot, support_hooks + from syncmaster.dto.connections import SambaConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class SambaHandler(LocalDFFileHandler): connection_dto: SambaConnectionDTO @@ -32,3 +35,11 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index 2c56ca19..23368e92 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -5,13 +5,16 @@ from typing import TYPE_CHECKING +from onetl.hooks import slot, support_hooks + from syncmaster.dto.connections import SFTPConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class SFTPHandler(LocalDFFileHandler): connection_dto: SFTPConnectionDTO @@ -29,3 +32,11 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df) diff --git a/syncmaster/worker/handlers/file/webdav.py b/syncmaster/worker/handlers/file/webdav.py index cb6da80a..60b1b9b9 100644 --- a/syncmaster/worker/handlers/file/webdav.py +++ b/syncmaster/worker/handlers/file/webdav.py @@ -6,14 +6,16 @@ from typing import TYPE_CHECKING from onetl.connection import SparkLocalFS, WebDAV +from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import WebDAVConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import SparkSession + from pyspark.sql import DataFrame, SparkSession +@support_hooks class WebDAVHandler(LocalDFFileHandler): connection_dto: WebDAVConnectionDTO @@ -29,3 +31,11 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() + + @slot + def read(self) -> DataFrame: + return super().read() + + @slot + def write(self, df: DataFrame) -> None: + return super().write(df)