Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/282.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Iceberg Connection to API
1 change: 1 addition & 0 deletions syncmaster/db/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
class ConnectionType(StrEnum):
POSTGRES = "postgres"
HIVE = "hive"
ICEBERG = "iceberg_rest_s3"
ORACLE = "oracle"
CLICKHOUSE = "clickhouse"
MSSQL = "mssql"
Expand Down
10 changes: 10 additions & 0 deletions syncmaster/schemas/v1/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
ReadBasicAuthSchema,
UpdateBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
IcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.s3 import (
CreateS3AuthSchema,
ReadS3AuthSchema,
Expand Down Expand Up @@ -35,4 +41,8 @@
"UpdateSambaAuthSchema",
"AuthTokenSchema",
"TokenPayloadSchema",
"IcebergRESTCatalogBasicAuthSchema",
"CreateIcebergRESTCatalogBasicAuthSchema",
"ReadIcebergRESTCatalogBasicAuthSchema",
"UpdateIcebergRESTCatalogBasicAuthSchema",
]
5 changes: 2 additions & 3 deletions syncmaster/schemas/v1/auth/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ class ReadBasicAuthSchema(BasicAuthSchema):
class UpdateBasicAuthSchema(CreateBasicAuthSchema):
password: SecretStr | None = None

@property
def secret_field(self) -> str:
return "password"
def get_secret_fields(self) -> tuple[str, ...]:
return ("password",)
29 changes: 29 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg_rest_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, SecretStr


class IcebergRESTCatalogBasicAuthSchema(BaseModel):
type: Literal["iceberg_rest_basic_s3_basic"]


class CreateIcebergRESTCatalogBasicAuthSchema(IcebergRESTCatalogBasicAuthSchema):
metastore_username: str
metastore_password: SecretStr
s3_access_key: str
s3_secret_key: SecretStr


class ReadIcebergRESTCatalogBasicAuthSchema(IcebergRESTCatalogBasicAuthSchema):
metastore_username: str
s3_access_key: str


class UpdateIcebergRESTCatalogBasicAuthSchema(CreateIcebergRESTCatalogBasicAuthSchema):
metastore_password: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("metastore_password", "s3_secret_key")
5 changes: 2 additions & 3 deletions syncmaster/schemas/v1/auth/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ class ReadS3AuthSchema(S3AuthSchema):
class UpdateS3AuthSchema(CreateS3AuthSchema):
secret_key: SecretStr | None = None

@property
def secret_field(self) -> str:
return "secret_key"
def get_secret_fields(self) -> tuple[str, ...]:
return ("secret_key",)
5 changes: 2 additions & 3 deletions syncmaster/schemas/v1/auth/samba.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ class ReadSambaAuthSchema(SambaAuthSchema):
class UpdateSambaAuthSchema(CreateSambaAuthSchema):
password: SecretStr | None = None

@property
def secret_field(self) -> str:
return "password"
def get_secret_fields(self) -> tuple[str, ...]:
return ("password",)
4 changes: 3 additions & 1 deletion syncmaster/schemas/v1/connection_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Literal

HIVE_TYPE = Literal["hive"]
ICEBERG_TYPE = Literal["iceberg_rest_s3"]
ORACLE_TYPE = Literal["oracle"]
POSTGRES_TYPE = Literal["postgres"]
CLICKHOUSE_TYPE = Literal["clickhouse"]
Expand All @@ -21,6 +22,7 @@
"postgres",
"clickhouse",
"hive",
"iceberg_rest_s3",
"mssql",
"mysql",
"s3",
Expand All @@ -32,4 +34,4 @@
"samba",
]
FILE_CONNECTION_TYPES = ["s3", "hdfs", "sftp", "ftp", "ftps", "webdav", "samba"]
DB_CONNECTION_TYPES = ["oracle", "postgres", "clickhouse", "hive", "mssql", "mysql"]
DB_CONNECTION_TYPES = ["oracle", "postgres", "clickhouse", "hive", "iceberg_rest_s3", "mssql", "mysql"]
8 changes: 8 additions & 0 deletions syncmaster/schemas/v1/connections/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
ReadHiveConnectionSchema,
UpdateHiveConnectionSchema,
)
from syncmaster.schemas.v1.connections.iceberg import (
CreateIcebergConnectionSchema,
ReadIcebergConnectionSchema,
UpdateIcebergConnectionSchema,
)
from syncmaster.schemas.v1.connections.mssql import (
CreateMSSQLConnectionSchema,
ReadMSSQLConnectionSchema,
Expand Down Expand Up @@ -80,6 +85,7 @@
| CreateMSSQLConnectionSchema
| CreateClickhouseConnectionSchema
| CreateHiveConnectionSchema
| CreateIcebergConnectionSchema
| CreateHDFSConnectionSchema
| CreateS3ConnectionSchema
| CreateSFTPConnectionSchema
Expand All @@ -96,6 +102,7 @@
| ReadMSSQLConnectionSchema
| ReadClickhouseConnectionSchema
| ReadHiveConnectionSchema
| ReadIcebergConnectionSchema
| ReadHDFSConnectionSchema
| ReadS3ConnectionSchema
| ReadSFTPConnectionSchema
Expand All @@ -112,6 +119,7 @@
| UpdateMSSQLConnectionSchema
| UpdateClickhouseConnectionSchema
| UpdateHiveConnectionSchema
| UpdateIcebergConnectionSchema
| UpdateHDFSConnectionSchema
| UpdateS3ConnectionSchema
| UpdateSFTPConnectionSchema
Expand Down
65 changes: 65 additions & 0 deletions syncmaster/schemas/v1/connections/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from typing import Literal

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.auth.iceberg_rest_basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE
from syncmaster.schemas.v1.connections.connection_base import (
CreateConnectionBaseSchema,
ReadConnectionBaseSchema,
)


class CreateIcebergRESTCatalogS3ConnectionDataSchema(BaseModel):
metastore_url: str
s3_warehouse_path: str
s3_host: str
s3_port: int | None = None
s3_protocol: Literal["http", "https"] = "https"
s3_bucket: str
s3_region: str
s3_path_style_access: bool = False


class ReadIcebergRESTCatalogS3ConnectionDataSchema(BaseModel):
metastore_url: str
s3_warehouse_path: str
s3_host: str
s3_port: int | None = None
s3_protocol: Literal["http", "https"] = "https"
s3_bucket: str
s3_region: str
s3_path_style_access: bool = False


class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
type: ICEBERG_TYPE = Field(description="Connection type")
data: CreateIcebergRESTCatalogS3ConnectionDataSchema = Field(
...,
alias="connection_data",
description=(
"Data required to connect to the database. These are the parameters that are specified in the URL request."
),
)
auth_data: CreateIcebergRESTCatalogBasicAuthSchema = Field(
description="Credentials for authorization",
)


class ReadIcebergConnectionSchema(ReadConnectionBaseSchema):
type: ICEBERG_TYPE
data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data")
auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None


class UpdateIcebergConnectionSchema(CreateIcebergConnectionSchema):
auth_data: UpdateIcebergRESTCatalogBasicAuthSchema = Field(
description="Credentials for authorization",
)
10 changes: 5 additions & 5 deletions syncmaster/server/api/v1/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ async def update_connection( # noqa: WPS217, WPS238

existing_credentials = await unit_of_work.credentials.read(connection_id=connection_id)
auth_data = connection_data.auth_data.model_dump()
secret_field = connection_data.auth_data.secret_field

if auth_data[secret_field] is None:
if existing_credentials["type"] != auth_data["type"]:
raise ConnectionAuthDataUpdateError
for secret_field in connection_data.auth_data.get_secret_fields():
if auth_data[secret_field] is None:
if existing_credentials["type"] != auth_data["type"]:
raise ConnectionAuthDataUpdateError

auth_data[secret_field] = existing_credentials[secret_field]
auth_data[secret_field] = existing_credentials[secret_field]

connection = await unit_of_work.connection.update(
connection_id=connection_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ async def group_connections(
"cluster": "cluster",
},
)
elif conn_type == ConnectionType.ICEBERG:
new_data.update(
{
"metastore_url": "https://rest.domain.com",
"s3_warehouse_path": "/some/warehouse",
"s3_host": "s3.domain.com",
"s3_bucket": "bucket",
"s3_region": "us-east-1",
},
)
elif conn_type == ConnectionType.S3:
new_data.update(
{
Expand Down
2 changes: 1 addition & 1 deletion tests/test_unit/test_connections/test_create_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async def test_check_name_field_validation_on_create_connection(
assert result.status_code == 422, result.json()
assert (
result.json()["error"]["details"][0]["message"]
== "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'oracle', 'postgres', 'mysql', 'mssql', 'clickhouse', 'hive', 'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'"
== "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'oracle', 'postgres', 'mysql', 'mssql', 'clickhouse', 'hive', 'iceberg_rest_s3', 'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'"
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.db.models import AuthData, Connection
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.server.settings import ServerAppSettings as Settings
from tests.mocks import MockGroup, UserTestRoles

pytestmark = [pytest.mark.asyncio, pytest.mark.server, pytest.mark.iceberg]


async def test_developer_plus_can_create_iceberg_rest_s3_connection(
client: AsyncClient,
group: MockGroup,
session: AsyncSession,
settings: Settings,
role_developer_plus: UserTestRoles,
):
user = group.get_member_of_role(role_developer_plus)

result = await client.post(
"v1/connections",
headers={"Authorization": f"Bearer {user.token}"},
json={
"group_id": group.id,
"name": "New connection",
"description": "",
"type": "iceberg_rest_s3",
"connection_data": {
"metastore_url": "https://rest.domain.com",
"s3_warehouse_path": "/some/warehouse",
"s3_protocol": "http",
"s3_host": "localhost",
"s3_port": 9010,
"s3_bucket": "some_bucket",
"s3_region": "us-east-1",
"s3_path_style_access": True,
},
"auth_data": {
"type": "iceberg_rest_basic_s3_basic",
"metastore_username": "user",
"metastore_password": "secret",
"s3_access_key": "access_key",
"s3_secret_key": "secret_key",
},
},
)
connection = (
await session.scalars(
select(Connection).filter_by(
name="New connection",
),
)
).first()

creds = (
await session.scalars(
select(AuthData).filter_by(
connection_id=connection.id,
),
)
).one()

decrypted = decrypt_auth_data(creds.value, settings=settings)
assert result.status_code == 200, result.json()
assert result.json() == {
"id": connection.id,
"group_id": connection.group_id,
"name": connection.name,
"description": connection.description,
"type": connection.type,
"connection_data": {
"metastore_url": connection.data["metastore_url"],
"s3_warehouse_path": connection.data["s3_warehouse_path"],
"s3_protocol": connection.data["s3_protocol"],
"s3_host": connection.data["s3_host"],
"s3_port": connection.data["s3_port"],
"s3_bucket": connection.data["s3_bucket"],
"s3_region": connection.data["s3_region"],
"s3_path_style_access": connection.data["s3_path_style_access"],
},
"auth_data": {
"type": decrypted["type"],
"metastore_username": decrypted["metastore_username"],
"s3_access_key": decrypted["s3_access_key"],
},
}
Loading
Loading