Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog/next_release/279.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added slots for data-rentgen plugin integration
-- by :github:user:`marashka`
3 changes: 3 additions & 0 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from etl_entities.hwm_store import BaseHWMStore
from horizon.client.auth import LoginPassword
from horizon_hwm_store import HorizonHWMStore
from onetl.hooks import slot, support_hooks
from onetl.strategy import IncrementalStrategy

from syncmaster.db.models import Connection, Run
Expand Down Expand Up @@ -146,6 +147,7 @@
}


@support_hooks
class TransferController:
settings: WorkerAppSettings
source_handler: Handler
Expand Down Expand Up @@ -187,6 +189,7 @@ def __init__(
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
)

@slot
def perform_transfer(self) -> None:
try:
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(
Expand Down
7 changes: 7 additions & 0 deletions syncmaster/worker/handlers/db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from onetl.connection import Clickhouse
from onetl.db import DBWriter
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import ClickhouseConnectionDTO
from syncmaster.dto.transfers import ClickhouseTransferDTO
Expand All @@ -17,6 +18,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class ClickhouseHandler(DBHandler):
connection: Clickhouse
connection_dto: ClickhouseConnectionDTO
Expand All @@ -40,6 +42,11 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
normalized_df = self._normalize_column_names(df)
sort_column = next(
Expand Down
7 changes: 7 additions & 0 deletions syncmaster/worker/handlers/db/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.connection import Hive
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import HiveConnectionDTO
from syncmaster.dto.transfers import HiveTransferDTO
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class HiveHandler(DBHandler):
connection: Hive
connection_dto: HiveConnectionDTO
Expand All @@ -31,10 +33,15 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name)
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 10 additions & 0 deletions syncmaster/worker/handlers/db/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.connection import MSSQL
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import MSSQLConnectionDTO
from syncmaster.dto.transfers import MSSQLTransferDTO
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class MSSQLHandler(DBHandler):
connection: MSSQL
connection_dto: MSSQLConnectionDTO
Expand All @@ -38,6 +40,14 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 10 additions & 0 deletions syncmaster/worker/handlers/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.connection import MySQL
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import MySQLConnectionDTO
from syncmaster.dto.transfers import MySQLTransferDTO
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class MySQLHandler(DBHandler):
connection: MySQL
connection_dto: MySQLConnectionDTO
Expand All @@ -35,6 +37,14 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
10 changes: 10 additions & 0 deletions syncmaster/worker/handlers/db/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.connection import Oracle
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import OracleConnectionDTO
from syncmaster.dto.transfers import OracleTransferDTO
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class OracleHandler(DBHandler):
connection: Oracle
connection_dto: OracleConnectionDTO
Expand All @@ -37,6 +39,14 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.upper())
Expand Down
10 changes: 10 additions & 0 deletions syncmaster/worker/handlers/db/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.connection import Postgres
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import PostgresConnectionDTO
from syncmaster.dto.transfers import PostgresTransferDTO
Expand All @@ -16,6 +17,7 @@
from pyspark.sql.dataframe import DataFrame


@support_hooks
class PostgresHandler(DBHandler):
connection: Postgres
connection_dto: PostgresConnectionDTO
Expand All @@ -36,6 +38,14 @@ def connect(self, spark: SparkSession):
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
Expand Down
13 changes: 12 additions & 1 deletion syncmaster/worker/handlers/file/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import FTPConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class FTPHandler(LocalDFFileHandler):
connection_dto: FTPConnectionDTO

Expand All @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 12 additions & 1 deletion syncmaster/worker/handlers/file/ftps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import FTPSConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class FTPSHandler(LocalDFFileHandler):
connection_dto: FTPSConnectionDTO

Expand All @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 12 additions & 1 deletion syncmaster/worker/handlers/file/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import HDFSConnectionDTO
from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class HDFSHandler(RemoteDFFileHandler):
connection_dto: HDFSConnectionDTO

Expand All @@ -28,3 +31,11 @@ def connect(self, spark: SparkSession):
user=self.connection_dto.user,
password=self.connection_dto.password,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
7 changes: 7 additions & 0 deletions syncmaster/worker/handlers/file/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from onetl.file import FileDFReader
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import S3ConnectionDTO
from syncmaster.worker.handlers.file.remote_df import RemoteDFFileHandler
Expand All @@ -14,6 +15,7 @@
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class S3Handler(RemoteDFFileHandler):
connection_dto: S3ConnectionDTO

Expand Down Expand Up @@ -42,6 +44,7 @@ def connect(self, spark: SparkSession):
region=self.connection_dto.region,
).check()

@slot
def read(self) -> DataFrame:
from pyspark.sql.types import StructType

Expand All @@ -67,3 +70,7 @@ def read(self) -> DataFrame:
df = df.selectExpr(*columns_filter_expressions)

return df

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 12 additions & 1 deletion syncmaster/worker/handlers/file/samba.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import SambaConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class SambaHandler(LocalDFFileHandler):
connection_dto: SambaConnectionDTO

Expand All @@ -32,3 +35,11 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
13 changes: 12 additions & 1 deletion syncmaster/worker/handlers/file/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import TYPE_CHECKING

from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import SFTPConnectionDTO
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, SparkSession


@support_hooks
class SFTPHandler(LocalDFFileHandler):
connection_dto: SFTPConnectionDTO

Expand All @@ -29,3 +32,11 @@ def connect(self, spark: SparkSession) -> None:
self.local_df_connection = SparkLocalFS(
spark=spark,
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)
Loading
Loading