Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/284.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for Iceberg with REST catalog and S3 warehouse
72 changes: 38 additions & 34 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ pyjwt = { version = "^2.10.1", optional = true }
jinja2 = { version = "^3.1.6", optional = true }
python-multipart = { version = "^0.0.20", optional = true }
celery = { version = "^5.5.0", optional = true }
onetl = { version = ">=0.13.5,<0.15.0", extras = ["all"], optional = true }
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", branch = "develop", extras = ["all"], optional = true }
# TODO: revert before next syncmaster release
pyspark = { version = "<4.0.0", optional = true }
pyyaml = { version = "*", optional = true }
psycopg2-binary = { version = "^2.9.10", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion syncmaster/db/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class ConnectionType(StrEnum):
POSTGRES = "postgres"
HIVE = "hive"
ICEBERG = "iceberg_rest_s3"
ICEBERG_REST_S3 = "iceberg_rest_s3"
ORACLE = "oracle"
CLICKHOUSE = "clickhouse"
MSSQL = "mssql"
Expand Down
17 changes: 17 additions & 0 deletions syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ class HiveConnectionDTO(ConnectionDTO):
type: ClassVar[str] = "hive"


@dataclass
class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
metastore_url: str
s3_warehouse_path: str
s3_host: str
s3_bucket: str
s3_region: str
s3_access_key: str
s3_secret_key: str
metastore_username: str
metastore_password: str
s3_port: int | None = None
s3_protocol: str = "https"
s3_path_style_access: bool = False
type: ClassVar[str] = "iceberg_rest_s3"


@dataclass
class HDFSConnectionDTO(ConnectionDTO):
user: str
Expand Down
13 changes: 12 additions & 1 deletion syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import json
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import ClassVar
from uuid import uuid4

from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet

Expand Down Expand Up @@ -107,6 +108,16 @@ def __post_init__(self):
self.options.setdefault("if_exists", "replace_overlapping_partitions")


@dataclass
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
type: ClassVar[str] = "iceberg_rest_s3"
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}")

def __post_init__(self):
super().__post_init__()
self.options.setdefault("if_exists", "replace_overlapping_partitions")


@dataclass
class S3TransferDTO(FileTransferDTO):
type: ClassVar[str] = "s3"
Expand Down
2 changes: 1 addition & 1 deletion syncmaster/schemas/v1/connection_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Literal

HIVE_TYPE = Literal["hive"]
ICEBERG_TYPE = Literal["iceberg_rest_s3"]
ICEBERG_REST_S3_TYPE = Literal["iceberg_rest_s3"]
ORACLE_TYPE = Literal["oracle"]
POSTGRES_TYPE = Literal["postgres"]
CLICKHOUSE_TYPE = Literal["clickhouse"]
Expand Down
12 changes: 9 additions & 3 deletions syncmaster/schemas/v1/connections/connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
# SPDX-License-Identifier: Apache-2.0
from pydantic import BaseModel, ConfigDict, Field

from syncmaster.schemas.v1.auth import ReadBasicAuthSchema, ReadS3AuthSchema
from syncmaster.schemas.v1.auth.samba import ReadSambaAuthSchema
from syncmaster.schemas.v1.auth import (
ReadBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
ReadS3AuthSchema,
ReadSambaAuthSchema,
)
from syncmaster.schemas.v1.types import NameConstr

ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema
ReadConnectionAuthDataSchema = (
ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema
)


class CreateConnectionBaseSchema(BaseModel):
Expand Down
6 changes: 3 additions & 3 deletions syncmaster/schemas/v1/connections/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE
from syncmaster.schemas.v1.connection_types import ICEBERG_REST_S3_TYPE
from syncmaster.schemas.v1.connections.connection_base import (
CreateConnectionBaseSchema,
ReadConnectionBaseSchema,
Expand Down Expand Up @@ -40,7 +40,7 @@ class ReadIcebergRESTCatalogS3ConnectionDataSchema(BaseModel):


class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
type: ICEBERG_TYPE = Field(description="Connection type")
type: ICEBERG_REST_S3_TYPE = Field(description="Connection type")
data: CreateIcebergRESTCatalogS3ConnectionDataSchema = Field(
...,
alias="connection_data",
Expand All @@ -54,7 +54,7 @@ class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):


class ReadIcebergConnectionSchema(ReadConnectionBaseSchema):
type: ICEBERG_TYPE
type: ICEBERG_REST_S3_TYPE
data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data")
auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None

Expand Down
13 changes: 12 additions & 1 deletion syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FTPSConnectionDTO,
HDFSConnectionDTO,
HiveConnectionDTO,
IcebergRESTCatalogS3ConnectionDTO,
MSSQLConnectionDTO,
MySQLConnectionDTO,
OracleConnectionDTO,
Expand All @@ -33,6 +34,7 @@
FTPTransferDTO,
HDFSTransferDTO,
HiveTransferDTO,
IcebergRESTCatalogS3TransferDTO,
MSSQLTransferDTO,
MySQLTransferDTO,
OracleTransferDTO,
Expand All @@ -45,10 +47,13 @@
from syncmaster.dto.transfers_resources import Resources
from syncmaster.dto.transfers_strategy import Strategy
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
from syncmaster.schemas.v1.connection_types import (
FILE_CONNECTION_TYPES,
)
from syncmaster.worker.handlers.base import Handler
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
from syncmaster.worker.handlers.db.hive import HiveHandler
from syncmaster.worker.handlers.db.iceberg import IcebergRESTCatalogS3Handler
from syncmaster.worker.handlers.db.mssql import MSSQLHandler
from syncmaster.worker.handlers.db.mysql import MySQLHandler
from syncmaster.worker.handlers.db.oracle import OracleHandler
Expand All @@ -72,6 +77,12 @@
HiveTransferDTO,
RunDTO,
),
"iceberg_rest_s3": (
IcebergRESTCatalogS3Handler,
IcebergRESTCatalogS3ConnectionDTO,
IcebergRESTCatalogS3TransferDTO,
RunDTO,
),
"oracle": (
OracleHandler,
OracleConnectionDTO,
Expand Down
91 changes: 91 additions & 0 deletions syncmaster/worker/handlers/db/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import TYPE_CHECKING

from onetl.connection import Iceberg
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
from syncmaster.worker.handlers.db.base import DBHandler

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame


@support_hooks
class IcebergRESTCatalogS3Handler(DBHandler):
connection: Iceberg
connection_dto: IcebergRESTCatalogS3ConnectionDTO
transfer_dto: IcebergRESTCatalogS3TransferDTO
_operators = {
"regexp": "RLIKE",
**DBHandler._operators,
}

def connect(self, spark: SparkSession):
self.connection = Iceberg(
spark=spark,
catalog_name=self.transfer_dto.catalog_name,
catalog=Iceberg.RESTCatalog(
uri=self.connection_dto.metastore_url,
auth=Iceberg.RESTCatalog.BasicAuth(
user=self.connection_dto.metastore_username,
password=self.connection_dto.metastore_password,
),
),
warehouse=Iceberg.S3Warehouse(
path=self.connection_dto.s3_warehouse_path,
host=self.connection_dto.s3_host,
port=self.connection_dto.s3_port,
protocol=self.connection_dto.s3_protocol,
bucket=self.connection_dto.s3_bucket,
path_style_access=self.connection_dto.s3_path_style_access,
region=self.connection_dto.s3_region,
access_key=self.connection_dto.s3_access_key,
secret_key=self.connection_dto.s3_secret_key,
),
).check()

@slot
def read(self) -> DataFrame:
return super().read()

@slot
def write(self, df: DataFrame) -> None:
return super().write(df)

def _normalize_column_names(self, df: DataFrame) -> DataFrame:
for column_name in df.columns:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = self._quote_field(filter["field"])
value = filter.get("value")

if value is None:
expressions.append(f"{field} {op}")
continue

if op == "ILIKE":
expressions.append(f"LOWER({field}) LIKE LOWER('{value}')")
elif op == "NOT ILIKE":
expressions.append(f"NOT LOWER({field}) LIKE LOWER('{value}')")
else:
expressions.append(f"{field} {op} '{value}'")

return " AND ".join(expressions) or None

def _get_reading_options(self) -> dict:
return {}

def _quote_field(self, field: str) -> str:
return f"`{field}`"
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def group_connections(
"cluster": "cluster",
},
)
elif conn_type == ConnectionType.ICEBERG:
elif conn_type == ConnectionType.ICEBERG_REST_S3:
new_data.update(
{
"metastore_url": "https://rest.domain.com",
Expand Down
Loading