Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from onetl.base import BaseDBConnection
from onetl.db import DBReader, DBWriter
from onetl.hooks import slot, support_hooks

from syncmaster.dto.transfers import DBTransferDTO
from syncmaster.worker.handlers.base import Handler
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class DBHandler(Handler):
connection: BaseDBConnection
transfer_dto: DBTransferDTO
Expand All @@ -35,6 +37,7 @@ class DBHandler(Handler):
"not_ilike": "NOT ILIKE",
}

@slot
def read(self) -> DataFrame:
reader_params = {}
if self.transfer_dto.strategy.type == "incremental":
Expand All @@ -58,6 +61,7 @@ def read(self) -> DataFrame:
)
return reader.run()

@slot
def write(self, df: DataFrame) -> None:
if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value:
self.transfer_dto.options["if_exists"] = "append"
Expand Down
7 changes: 1 addition & 6 deletions syncmaster/worker/handlers/db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from onetl.connection import Clickhouse
from onetl.db import DBWriter
from onetl.hooks import slot, support_hooks
from onetl.hooks import slot

from syncmaster.dto.connections import ClickhouseConnectionDTO
from syncmaster.dto.transfers import ClickhouseTransferDTO
Expand All @@ -18,7 +18,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class ClickhouseHandler(DBHandler):
connection: Clickhouse
connection_dto: ClickhouseConnectionDTO
Expand All @@ -42,10 +41,6 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
normalized_df = self._normalize_column_names(df)
Expand Down
4 changes: 0 additions & 4 deletions syncmaster/worker/handlers/db/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def read(self) -> DataFrame:
self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name)
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/worker/handlers/db/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from onetl.connection import Iceberg
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
Expand All @@ -17,7 +16,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class IcebergRESTCatalogS3Handler(DBHandler):
connection: Iceberg
connection_dto: IcebergRESTCatalogS3ConnectionDTO
Expand Down Expand Up @@ -51,14 +49,6 @@ def connect(self, spark: SparkSession):
),
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/worker/handlers/db/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from onetl.connection import MSSQL
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import MSSQLConnectionDTO
from syncmaster.dto.transfers import MSSQLTransferDTO
Expand All @@ -17,7 +16,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class MSSQLHandler(DBHandler):
connection: MSSQL
connection_dto: MSSQLConnectionDTO
Expand All @@ -40,14 +38,6 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/worker/handlers/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from onetl.connection import MySQL
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import MySQLConnectionDTO
from syncmaster.dto.transfers import MySQLTransferDTO
Expand All @@ -17,7 +16,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class MySQLHandler(DBHandler):
connection: MySQL
connection_dto: MySQLConnectionDTO
Expand All @@ -37,14 +35,6 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/worker/handlers/db/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from onetl.connection import Oracle
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import OracleConnectionDTO
from syncmaster.dto.transfers import OracleTransferDTO
Expand All @@ -17,7 +16,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class OracleHandler(DBHandler):
connection: Oracle
connection_dto: OracleConnectionDTO
Expand All @@ -39,14 +37,6 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.upper())
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/worker/handlers/db/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import TYPE_CHECKING

from onetl.connection import Postgres
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import PostgresConnectionDTO
from syncmaster.dto.transfers import PostgresTransferDTO
Expand All @@ -17,7 +16,6 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class PostgresHandler(DBHandler):
connection: Postgres
connection_dto: PostgresConnectionDTO
Expand All @@ -38,14 +36,6 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
13 changes: 1 addition & 12 deletions syncmaster/worker/handlers/file/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import FTPConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import SparkSession


@support_hooks
class FTPHandler(LocalDFFileHandler):
connection_dto: FTPConnectionDTO

Expand All @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 1 addition & 12 deletions syncmaster/worker/handlers/file/ftps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import FTPSConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import SparkSession


@support_hooks
class FTPSHandler(LocalDFFileHandler):
connection_dto: FTPSConnectionDTO

Expand All @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 1 addition & 12 deletions syncmaster/worker/handlers/file/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import HDFSConnectionDTO
from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import SparkSession


@support_hooks
class HDFSHandler(RemoteDFFileHandler):
connection_dto: HDFSConnectionDTO

Expand All @@ -31,11 +28,3 @@ def connect(self, spark: SparkSession):
user=self.connection_dto.user,
password=self.connection_dto.password,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
5 changes: 4 additions & 1 deletion syncmaster/worker/handlers/file/local_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
from etl_entities.hwm import FileListHWM, FileModifiedTimeHWM
from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader
from onetl.file.filter import FileSizeRange, Glob, Regexp
from onetl.hooks import slot, support_hooks

from syncmaster.worker.handlers.file.base import FileHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame


@support_hooks
class LocalDFFileHandler(FileHandler):

@slot
def read(self) -> DataFrame:
from pyspark.sql.types import StructType

Expand Down Expand Up @@ -59,8 +62,8 @@ def read(self) -> DataFrame:

return df

@slot
def write(self, df: DataFrame) -> None:

writer = FileDFWriter(
connection=self.local_df_connection,
format=self.transfer_dto.file_format,
Expand Down
4 changes: 4 additions & 0 deletions syncmaster/worker/handlers/file/remote_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

from onetl.file import FileDFReader, FileDFWriter, FileMover
from onetl.file.filter import Glob
from onetl.hooks import slot, support_hooks

from syncmaster.worker.handlers.file.base import FileHandler

if TYPE_CHECKING:
from pyspark.sql import DataFrame


@support_hooks
class RemoteDFFileHandler(FileHandler):

@slot
def read(self) -> DataFrame:
from pyspark.sql.types import StructType

Expand All @@ -38,6 +41,7 @@ def read(self) -> DataFrame:

return df

@slot
def write(self, df: DataFrame) -> None:
tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id))
try:
Expand Down
4 changes: 0 additions & 4 deletions syncmaster/worker/handlers/file/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,3 @@ def read(self) -> DataFrame:
df = df.selectExpr(*columns_filter_expressions)

return df

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
Loading
Loading