diff --git a/syncmaster/worker/handlers/db/base.py b/syncmaster/worker/handlers/db/base.py index 22bf6b6a..e3d91999 100644 --- a/syncmaster/worker/handlers/db/base.py +++ b/syncmaster/worker/handlers/db/base.py @@ -8,6 +8,7 @@ from onetl.base import BaseDBConnection from onetl.db import DBReader, DBWriter +from onetl.hooks import slot, support_hooks from syncmaster.dto.transfers import DBTransferDTO from syncmaster.worker.handlers.base import Handler @@ -16,6 +17,7 @@ from pyspark.sql.dataframe import DataFrame +@support_hooks class DBHandler(Handler): connection: BaseDBConnection transfer_dto: DBTransferDTO @@ -35,6 +37,7 @@ class DBHandler(Handler): "not_ilike": "NOT ILIKE", } + @slot def read(self) -> DataFrame: reader_params = {} if self.transfer_dto.strategy.type == "incremental": @@ -58,6 +61,7 @@ def read(self) -> DataFrame: ) return reader.run() + @slot def write(self, df: DataFrame) -> None: if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value: self.transfer_dto.options["if_exists"] = "append" diff --git a/syncmaster/worker/handlers/db/clickhouse.py b/syncmaster/worker/handlers/db/clickhouse.py index ec6e2e58..a2e04aa5 100644 --- a/syncmaster/worker/handlers/db/clickhouse.py +++ b/syncmaster/worker/handlers/db/clickhouse.py @@ -7,7 +7,7 @@ from onetl.connection import Clickhouse from onetl.db import DBWriter -from onetl.hooks import slot, support_hooks +from onetl.hooks import slot from syncmaster.dto.connections import ClickhouseConnectionDTO from syncmaster.dto.transfers import ClickhouseTransferDTO @@ -18,7 +18,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class ClickhouseHandler(DBHandler): connection: Clickhouse connection_dto: ClickhouseConnectionDTO @@ -42,10 +41,6 @@ def connect(self, spark: SparkSession): spark=spark, ).check() - @slot - def read(self) -> DataFrame: - return super().read() - @slot def write(self, df: DataFrame) -> None: normalized_df = self._normalize_column_names(df) diff --git a/syncmaster/worker/handlers/db/hive.py b/syncmaster/worker/handlers/db/hive.py index 98f3d567..90b9a23f 100644 --- a/syncmaster/worker/handlers/db/hive.py +++ b/syncmaster/worker/handlers/db/hive.py @@ -38,10 +38,6 @@ def read(self) -> DataFrame: self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name) return super().read() - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/iceberg.py b/syncmaster/worker/handlers/db/iceberg.py index 80e37531..779c6a33 100644 --- a/syncmaster/worker/handlers/db/iceberg.py +++ b/syncmaster/worker/handlers/db/iceberg.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from onetl.connection import Iceberg -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO @@ -17,7 +16,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class IcebergRESTCatalogS3Handler(DBHandler): connection: Iceberg connection_dto: IcebergRESTCatalogS3ConnectionDTO @@ -51,14 +49,6 @@ def connect(self, spark: SparkSession): ), ).check() - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/mssql.py b/syncmaster/worker/handlers/db/mssql.py index d1b043f8..a2ddb9d2 100644 --- a/syncmaster/worker/handlers/db/mssql.py +++ b/syncmaster/worker/handlers/db/mssql.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from onetl.connection import MSSQL -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import MSSQLConnectionDTO from syncmaster.dto.transfers import MSSQLTransferDTO @@ -17,7 +16,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class MSSQLHandler(DBHandler): connection: MSSQL connection_dto: MSSQLConnectionDTO @@ -40,14 +38,6 @@ def connect(self, spark: SparkSession): spark=spark, ).check() - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/mysql.py b/syncmaster/worker/handlers/db/mysql.py index 49009abb..c991780b 100644 --- a/syncmaster/worker/handlers/db/mysql.py +++ b/syncmaster/worker/handlers/db/mysql.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from onetl.connection import MySQL -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import MySQLConnectionDTO from syncmaster.dto.transfers import MySQLTransferDTO @@ -17,7 +16,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class MySQLHandler(DBHandler): connection: MySQL connection_dto: MySQLConnectionDTO @@ -37,14 +35,6 @@ def connect(self, spark: SparkSession): spark=spark, ).check() - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/db/oracle.py b/syncmaster/worker/handlers/db/oracle.py index c42e03bc..a65af99b 100644 --- a/syncmaster/worker/handlers/db/oracle.py +++ b/syncmaster/worker/handlers/db/oracle.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from onetl.connection import Oracle -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import OracleConnectionDTO from syncmaster.dto.transfers import OracleTransferDTO @@ -17,7 +16,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class OracleHandler(DBHandler): connection: Oracle connection_dto: OracleConnectionDTO @@ -39,14 +37,6 @@ def connect(self, spark: SparkSession): spark=spark, ).check() - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.upper()) diff --git a/syncmaster/worker/handlers/db/postgres.py b/syncmaster/worker/handlers/db/postgres.py index 0d4b4506..eadfcba1 100644 --- a/syncmaster/worker/handlers/db/postgres.py +++ b/syncmaster/worker/handlers/db/postgres.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING from onetl.connection import Postgres -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import PostgresConnectionDTO from syncmaster.dto.transfers import PostgresTransferDTO @@ -17,7 +16,6 @@ from pyspark.sql.dataframe import DataFrame -@support_hooks class PostgresHandler(DBHandler): connection: Postgres connection_dto: PostgresConnectionDTO @@ -38,14 +36,6 @@ def connect(self, spark: SparkSession): spark=spark, ).check() - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) - def _normalize_column_names(self, df: DataFrame) -> DataFrame: for column_name in df.columns: df = df.withColumnRenamed(column_name, column_name.lower()) diff --git a/syncmaster/worker/handlers/file/ftp.py b/syncmaster/worker/handlers/file/ftp.py index 2738231e..e278b749 100644 --- a/syncmaster/worker/handlers/file/ftp.py +++ b/syncmaster/worker/handlers/file/ftp.py @@ -5,16 +5,13 @@ from typing import TYPE_CHECKING -from onetl.hooks import slot, support_hooks - from syncmaster.dto.connections import FTPConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class FTPHandler(LocalDFFileHandler): connection_dto: FTPConnectionDTO @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/ftps.py b/syncmaster/worker/handlers/file/ftps.py index 46e5640f..60ae292d 100644 --- a/syncmaster/worker/handlers/file/ftps.py +++ b/syncmaster/worker/handlers/file/ftps.py @@ -5,16 +5,13 @@ from typing import TYPE_CHECKING -from onetl.hooks import slot, support_hooks - from syncmaster.dto.connections import FTPSConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class FTPSHandler(LocalDFFileHandler): connection_dto: FTPSConnectionDTO @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/hdfs.py b/syncmaster/worker/handlers/file/hdfs.py index 98e80c8f..bcf69837 100644 --- a/syncmaster/worker/handlers/file/hdfs.py +++ b/syncmaster/worker/handlers/file/hdfs.py @@ -5,16 +5,13 @@ from typing import TYPE_CHECKING -from onetl.hooks import slot, support_hooks - from syncmaster.dto.connections import HDFSConnectionDTO from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class HDFSHandler(RemoteDFFileHandler): connection_dto: HDFSConnectionDTO @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession): user=self.connection_dto.user, password=self.connection_dto.password, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/local_df.py b/syncmaster/worker/handlers/file/local_df.py index 3c510bb1..6d241b85 100644 --- a/syncmaster/worker/handlers/file/local_df.py +++ b/syncmaster/worker/handlers/file/local_df.py @@ -9,6 +9,7 @@ from etl_entities.hwm import FileListHWM, FileModifiedTimeHWM from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader from onetl.file.filter import FileSizeRange, Glob, Regexp +from onetl.hooks import slot, support_hooks from syncmaster.worker.handlers.file.base import FileHandler @@ -16,8 +17,10 @@ from pyspark.sql import DataFrame +@support_hooks class LocalDFFileHandler(FileHandler): + @slot def read(self) -> DataFrame: from pyspark.sql.types import StructType @@ -59,8 +62,8 @@ def read(self) -> DataFrame: return df + @slot def write(self, df: DataFrame) -> None: - writer = FileDFWriter( connection=self.local_df_connection, format=self.transfer_dto.file_format, diff --git a/syncmaster/worker/handlers/file/remote_df.py b/syncmaster/worker/handlers/file/remote_df.py index 57dd9a6b..f77ed71a 100644 --- a/syncmaster/worker/handlers/file/remote_df.py +++ b/syncmaster/worker/handlers/file/remote_df.py @@ -7,6 +7,7 @@ from onetl.file import FileDFReader, FileDFWriter, FileMover from onetl.file.filter import Glob +from onetl.hooks import slot, support_hooks from syncmaster.worker.handlers.file.base import FileHandler @@ -14,8 +15,10 @@ from pyspark.sql import DataFrame +@support_hooks class RemoteDFFileHandler(FileHandler): + @slot def read(self) -> DataFrame: from pyspark.sql.types import StructType @@ -38,6 +41,7 @@ def read(self) -> DataFrame: return df + @slot def write(self, df: DataFrame) -> None: tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id)) try: diff --git a/syncmaster/worker/handlers/file/s3.py b/syncmaster/worker/handlers/file/s3.py index ab248461..01e0db1f 100644 --- a/syncmaster/worker/handlers/file/s3.py +++ b/syncmaster/worker/handlers/file/s3.py @@ -70,7 +70,3 @@ def read(self) -> DataFrame: df = df.selectExpr(*columns_filter_expressions) return df - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/samba.py b/syncmaster/worker/handlers/file/samba.py index 9a52825e..89314dca 100644 --- a/syncmaster/worker/handlers/file/samba.py +++ b/syncmaster/worker/handlers/file/samba.py @@ -5,16 +5,13 @@ from typing import TYPE_CHECKING -from onetl.hooks import slot, support_hooks - from syncmaster.dto.connections import SambaConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class SambaHandler(LocalDFFileHandler): connection_dto: SambaConnectionDTO @@ -35,11 +32,3 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index 23368e92..2c56ca19 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -5,16 +5,13 @@ from typing import TYPE_CHECKING -from onetl.hooks import slot, support_hooks - from syncmaster.dto.connections import SFTPConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class SFTPHandler(LocalDFFileHandler): connection_dto: SFTPConnectionDTO @@ -32,11 +29,3 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df) diff --git a/syncmaster/worker/handlers/file/webdav.py b/syncmaster/worker/handlers/file/webdav.py index 60b1b9b9..cb6da80a 100644 --- a/syncmaster/worker/handlers/file/webdav.py +++ b/syncmaster/worker/handlers/file/webdav.py @@ -6,16 +6,14 @@ from typing import TYPE_CHECKING from onetl.connection import SparkLocalFS, WebDAV -from onetl.hooks import slot, support_hooks from syncmaster.dto.connections import WebDAVConnectionDTO from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler if TYPE_CHECKING: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import SparkSession -@support_hooks class WebDAVHandler(LocalDFFileHandler): connection_dto: WebDAVConnectionDTO @@ -31,11 +29,3 @@ def connect(self, spark: SparkSession) -> None: self.local_df_connection = SparkLocalFS( spark=spark, ).check() - - @slot - def read(self) -> DataFrame: - return super().read() - - @slot - def write(self, df: DataFrame) -> None: - return super().write(df)