diff --git a/docs/changelog/next_release/282.feature.rst b/docs/changelog/next_release/282.feature.rst new file mode 100644 index 00000000..f14a188e --- /dev/null +++ b/docs/changelog/next_release/282.feature.rst @@ -0,0 +1 @@ +Added Iceberg Connection to API \ No newline at end of file diff --git a/syncmaster/db/models/connection.py b/syncmaster/db/models/connection.py index bef73f61..8a57babc 100644 --- a/syncmaster/db/models/connection.py +++ b/syncmaster/db/models/connection.py @@ -18,6 +18,7 @@ class ConnectionType(StrEnum): POSTGRES = "postgres" HIVE = "hive" + ICEBERG = "iceberg_rest_s3" ORACLE = "oracle" CLICKHOUSE = "clickhouse" MSSQL = "mssql" diff --git a/syncmaster/schemas/v1/auth/__init__.py b/syncmaster/schemas/v1/auth/__init__.py index be2bd504..e13da280 100644 --- a/syncmaster/schemas/v1/auth/__init__.py +++ b/syncmaster/schemas/v1/auth/__init__.py @@ -6,6 +6,12 @@ ReadBasicAuthSchema, UpdateBasicAuthSchema, ) +from syncmaster.schemas.v1.auth.iceberg_rest_basic import ( + CreateIcebergRESTCatalogBasicAuthSchema, + IcebergRESTCatalogBasicAuthSchema, + ReadIcebergRESTCatalogBasicAuthSchema, + UpdateIcebergRESTCatalogBasicAuthSchema, +) from syncmaster.schemas.v1.auth.s3 import ( CreateS3AuthSchema, ReadS3AuthSchema, @@ -35,4 +41,8 @@ "UpdateSambaAuthSchema", "AuthTokenSchema", "TokenPayloadSchema", + "IcebergRESTCatalogBasicAuthSchema", + "CreateIcebergRESTCatalogBasicAuthSchema", + "ReadIcebergRESTCatalogBasicAuthSchema", + "UpdateIcebergRESTCatalogBasicAuthSchema", ] diff --git a/syncmaster/schemas/v1/auth/basic.py b/syncmaster/schemas/v1/auth/basic.py index 525275ec..2af7fd7e 100644 --- a/syncmaster/schemas/v1/auth/basic.py +++ b/syncmaster/schemas/v1/auth/basic.py @@ -21,6 +21,5 @@ class ReadBasicAuthSchema(BasicAuthSchema): class UpdateBasicAuthSchema(CreateBasicAuthSchema): password: SecretStr | None = None - @property - def secret_field(self) -> str: - return "password" + def get_secret_fields(self) -> tuple[str, ...]: + return ("password",) diff --git a/syncmaster/schemas/v1/auth/iceberg_rest_basic.py b/syncmaster/schemas/v1/auth/iceberg_rest_basic.py new file mode 100644 index 00000000..9e9daa67 --- /dev/null +++ b/syncmaster/schemas/v1/auth/iceberg_rest_basic.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from typing import Literal + +from pydantic import BaseModel, SecretStr + + +class IcebergRESTCatalogBasicAuthSchema(BaseModel): + type: Literal["iceberg_rest_basic_s3_basic"] + + +class CreateIcebergRESTCatalogBasicAuthSchema(IcebergRESTCatalogBasicAuthSchema): + metastore_username: str + metastore_password: SecretStr + s3_access_key: str + s3_secret_key: SecretStr + + +class ReadIcebergRESTCatalogBasicAuthSchema(IcebergRESTCatalogBasicAuthSchema): + metastore_username: str + s3_access_key: str + + +class UpdateIcebergRESTCatalogBasicAuthSchema(CreateIcebergRESTCatalogBasicAuthSchema): + metastore_password: SecretStr | None = None + s3_secret_key: SecretStr | None = None + + def get_secret_fields(self) -> tuple[str, ...]: + return ("metastore_password", "s3_secret_key") diff --git a/syncmaster/schemas/v1/auth/s3.py b/syncmaster/schemas/v1/auth/s3.py index bdf8b0ff..e0b6a3bc 100644 --- a/syncmaster/schemas/v1/auth/s3.py +++ b/syncmaster/schemas/v1/auth/s3.py @@ -21,6 +21,5 @@ class ReadS3AuthSchema(S3AuthSchema): class UpdateS3AuthSchema(CreateS3AuthSchema): secret_key: SecretStr | None = None - @property - def secret_field(self) -> str: - return "secret_key" + def get_secret_fields(self) -> tuple[str, ...]: + return ("secret_key",) diff --git a/syncmaster/schemas/v1/auth/samba.py b/syncmaster/schemas/v1/auth/samba.py index e8074f36..3de4f8fd 100644 --- a/syncmaster/schemas/v1/auth/samba.py +++ b/syncmaster/schemas/v1/auth/samba.py @@ -23,6 +23,5 @@ class ReadSambaAuthSchema(SambaAuthSchema): class UpdateSambaAuthSchema(CreateSambaAuthSchema): password: SecretStr | None = None - @property - def secret_field(self) -> str: - return "password" + def get_secret_fields(self) -> tuple[str, ...]: + return ("password",) diff --git a/syncmaster/schemas/v1/connection_types.py b/syncmaster/schemas/v1/connection_types.py index dd0269bc..ad728c0e 100644 --- a/syncmaster/schemas/v1/connection_types.py +++ b/syncmaster/schemas/v1/connection_types.py @@ -3,6 +3,7 @@ from typing import Literal HIVE_TYPE = Literal["hive"] +ICEBERG_TYPE = Literal["iceberg_rest_s3"] ORACLE_TYPE = Literal["oracle"] POSTGRES_TYPE = Literal["postgres"] CLICKHOUSE_TYPE = Literal["clickhouse"] @@ -21,6 +22,7 @@ "postgres", "clickhouse", "hive", + "iceberg_rest_s3", "mssql", "mysql", "s3", @@ -32,4 +34,4 @@ "samba", ] FILE_CONNECTION_TYPES = ["s3", "hdfs", "sftp", "ftp", "ftps", "webdav", "samba"] -DB_CONNECTION_TYPES = ["oracle", "postgres", "clickhouse", "hive", "mssql", "mysql"] +DB_CONNECTION_TYPES = ["oracle", "postgres", "clickhouse", "hive", "iceberg_rest_s3", "mssql", "mysql"] diff --git a/syncmaster/schemas/v1/connections/connection.py b/syncmaster/schemas/v1/connections/connection.py index ad13a6b6..5b3a1936 100644 --- a/syncmaster/schemas/v1/connections/connection.py +++ b/syncmaster/schemas/v1/connections/connection.py @@ -30,6 +30,11 @@ ReadHiveConnectionSchema, UpdateHiveConnectionSchema, ) +from syncmaster.schemas.v1.connections.iceberg import ( + CreateIcebergConnectionSchema, + ReadIcebergConnectionSchema, + UpdateIcebergConnectionSchema, +) from syncmaster.schemas.v1.connections.mssql import ( CreateMSSQLConnectionSchema, ReadMSSQLConnectionSchema, @@ -80,6 +85,7 @@ | CreateMSSQLConnectionSchema | CreateClickhouseConnectionSchema | CreateHiveConnectionSchema + | CreateIcebergConnectionSchema | CreateHDFSConnectionSchema | CreateS3ConnectionSchema | CreateSFTPConnectionSchema @@ -96,6 +102,7 @@ | ReadMSSQLConnectionSchema | ReadClickhouseConnectionSchema | ReadHiveConnectionSchema + | ReadIcebergConnectionSchema | ReadHDFSConnectionSchema | ReadS3ConnectionSchema | ReadSFTPConnectionSchema @@ -112,6 +119,7 @@ | UpdateMSSQLConnectionSchema | UpdateClickhouseConnectionSchema | UpdateHiveConnectionSchema + | UpdateIcebergConnectionSchema | UpdateHDFSConnectionSchema | UpdateS3ConnectionSchema | UpdateSFTPConnectionSchema diff --git a/syncmaster/schemas/v1/connections/iceberg.py b/syncmaster/schemas/v1/connections/iceberg.py new file mode 100644 index 00000000..f75b6ba3 --- /dev/null +++ b/syncmaster/schemas/v1/connections/iceberg.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal + +from pydantic import BaseModel, Field + +from syncmaster.schemas.v1.auth.iceberg_rest_basic import ( + CreateIcebergRESTCatalogBasicAuthSchema, + ReadIcebergRESTCatalogBasicAuthSchema, + UpdateIcebergRESTCatalogBasicAuthSchema, +) +from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE +from syncmaster.schemas.v1.connections.connection_base import ( + CreateConnectionBaseSchema, + ReadConnectionBaseSchema, +) + + +class CreateIcebergRESTCatalogS3ConnectionDataSchema(BaseModel): + metastore_url: str + s3_warehouse_path: str + s3_host: str + s3_port: int | None = None + s3_protocol: Literal["http", "https"] = "https" + s3_bucket: str + s3_region: str + s3_path_style_access: bool = False + + +class ReadIcebergRESTCatalogS3ConnectionDataSchema(BaseModel): + metastore_url: str + s3_warehouse_path: str + s3_host: str + s3_port: int | None = None + s3_protocol: Literal["http", "https"] = "https" + s3_bucket: str + s3_region: str + s3_path_style_access: bool = False + + +class CreateIcebergConnectionSchema(CreateConnectionBaseSchema): + type: ICEBERG_TYPE = Field(description="Connection type") + data: CreateIcebergRESTCatalogS3ConnectionDataSchema = Field( + ..., + alias="connection_data", + description=( + "Data required to connect to the database. These are the parameters that are specified in the URL request." + ), + ) + auth_data: CreateIcebergRESTCatalogBasicAuthSchema = Field( + description="Credentials for authorization", + ) + + +class ReadIcebergConnectionSchema(ReadConnectionBaseSchema): + type: ICEBERG_TYPE + data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data") + auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None + + +class UpdateIcebergConnectionSchema(CreateIcebergConnectionSchema): + auth_data: UpdateIcebergRESTCatalogBasicAuthSchema = Field( + description="Credentials for authorization", + ) diff --git a/syncmaster/server/api/v1/connections.py b/syncmaster/server/api/v1/connections.py index 6a698cdb..5e634037 100644 --- a/syncmaster/server/api/v1/connections.py +++ b/syncmaster/server/api/v1/connections.py @@ -209,13 +209,13 @@ async def update_connection( # noqa: WPS217, WPS238 existing_credentials = await unit_of_work.credentials.read(connection_id=connection_id) auth_data = connection_data.auth_data.model_dump() - secret_field = connection_data.auth_data.secret_field - if auth_data[secret_field] is None: - if existing_credentials["type"] != auth_data["type"]: - raise ConnectionAuthDataUpdateError + for secret_field in connection_data.auth_data.get_secret_fields(): + if auth_data[secret_field] is None: + if existing_credentials["type"] != auth_data["type"]: + raise ConnectionAuthDataUpdateError - auth_data[secret_field] = existing_credentials[secret_field] + auth_data[secret_field] = existing_credentials[secret_field] connection = await unit_of_work.connection.update( connection_id=connection_id, diff --git a/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py b/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py index 523a5154..22f104ca 100644 --- a/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py +++ b/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py @@ -31,6 +31,16 @@ async def group_connections( "cluster": "cluster", }, ) + elif conn_type == ConnectionType.ICEBERG: + new_data.update( + { + "metastore_url": "https://rest.domain.com", + "s3_warehouse_path": "/some/warehouse", + "s3_host": "s3.domain.com", + "s3_bucket": "bucket", + "s3_region": "us-east-1", + }, + ) elif conn_type == ConnectionType.S3: new_data.update( { diff --git a/tests/test_unit/test_connections/test_create_connection.py b/tests/test_unit/test_connections/test_create_connection.py index b4d4608f..c63d482a 100644 --- a/tests/test_unit/test_connections/test_create_connection.py +++ b/tests/test_unit/test_connections/test_create_connection.py @@ -212,7 +212,7 @@ async def test_check_name_field_validation_on_create_connection( assert result.status_code == 422, result.json() assert ( result.json()["error"]["details"][0]["message"] - == "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'oracle', 'postgres', 'mysql', 'mssql', 'clickhouse', 'hive', 'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'" + == "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: 'oracle', 'postgres', 'mysql', 'mssql', 'clickhouse', 'hive', 'iceberg_rest_s3', 'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'" ) diff --git a/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py b/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py new file mode 100644 index 00000000..e3981b6f --- /dev/null +++ b/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py @@ -0,0 +1,89 @@ +import pytest +from httpx import AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import AuthData, Connection +from syncmaster.db.repositories.utils import decrypt_auth_data +from syncmaster.server.settings import ServerAppSettings as Settings +from tests.mocks import MockGroup, UserTestRoles + +pytestmark = [pytest.mark.asyncio, pytest.mark.server, pytest.mark.iceberg] + + +async def test_developer_plus_can_create_iceberg_rest_s3_connection( + client: AsyncClient, + group: MockGroup, + session: AsyncSession, + settings: Settings, + role_developer_plus: UserTestRoles, +): + user = group.get_member_of_role(role_developer_plus) + + result = await client.post( + "v1/connections", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + "group_id": group.id, + "name": "New connection", + "description": "", + "type": "iceberg_rest_s3", + "connection_data": { + "metastore_url": "https://rest.domain.com", + "s3_warehouse_path": "/some/warehouse", + "s3_protocol": "http", + "s3_host": "localhost", + "s3_port": 9010, + "s3_bucket": "some_bucket", + "s3_region": "us-east-1", + "s3_path_style_access": True, + }, + "auth_data": { + "type": "iceberg_rest_basic_s3_basic", + "metastore_username": "user", + "metastore_password": "secret", + "s3_access_key": "access_key", + "s3_secret_key": "secret_key", + }, + }, + ) + connection = ( + await session.scalars( + select(Connection).filter_by( + name="New connection", + ), + ) + ).first() + + creds = ( + await session.scalars( + select(AuthData).filter_by( + connection_id=connection.id, + ), + ) + ).one() + + decrypted = decrypt_auth_data(creds.value, settings=settings) + assert result.status_code == 200, result.json() + assert result.json() == { + "id": connection.id, + "group_id": connection.group_id, + "name": connection.name, + "description": connection.description, + "type": connection.type, + "connection_data": { + "metastore_url": connection.data["metastore_url"], + "s3_warehouse_path": connection.data["s3_warehouse_path"], + "s3_protocol": connection.data["s3_protocol"], + "s3_host": connection.data["s3_host"], + "s3_port": connection.data["s3_port"], + "s3_bucket": connection.data["s3_bucket"], + "s3_region": connection.data["s3_region"], + "s3_path_style_access": connection.data["s3_path_style_access"], + }, + "auth_data": { + "type": decrypted["type"], + "metastore_username": decrypted["metastore_username"], + "s3_access_key": decrypted["s3_access_key"], + }, + } diff --git a/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py b/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py new file mode 100644 index 00000000..50765274 --- /dev/null +++ b/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py @@ -0,0 +1,89 @@ +import pytest +from httpx import AsyncClient + +from tests.mocks import MockConnection, UserTestRoles +from tests.test_unit.utils import fetch_connection_json + +pytestmark = [pytest.mark.asyncio, pytest.mark.server, pytest.mark.iceberg] + + +@pytest.mark.parametrize( + "connection_type,create_connection_data,create_connection_auth_data", + [ + ( + "iceberg_rest_s3", + { + "metastore_url": "http://domain.com:8000", + "s3_warehouse_path": "/some/warehouse", + "s3_protocol": "http", + "s3_host": "localhost", + "s3_port": 9010, + "s3_bucket": "some_bucket", + "s3_region": "us-east-1", + "s3_path_style_access": True, + }, + { + "type": "iceberg_rest_basic_s3_basic", + "metastore_username": "user", + "metastore_password": "secret", + "s3_access_key": "access_key", + "s3_secret_key": "secret_key", + }, + ), + ], + indirect=["create_connection_data", "create_connection_auth_data"], +) +async def test_developer_plus_can_update_iceberg_rest_s3_connection( + client: AsyncClient, + group_connection: MockConnection, + role_developer_plus: UserTestRoles, +): + user = group_connection.owner_group.get_member_of_role(role_developer_plus) + connection_json = await fetch_connection_json(client, user.token, group_connection) + + result = await client.put( + f"v1/connections/{group_connection.id}", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + **connection_json, + "type": group_connection.type, + "connection_data": { + "metastore_url": "http://rest.domain.com:8000", + "s3_warehouse_path": "/some/new/warehouse", + "s3_protocol": "https", + "s3_host": "s3.domain.com", + "s3_bucket": "new_bucket", + "s3_region": "us-east-2", + }, + "auth_data": { + "type": "iceberg_rest_basic_s3_basic", + "metastore_username": "new_user", + "metastore_password": "new_password", + "s3_access_key": "new_access_key", + }, + }, + ) + + assert result.status_code == 200, result.json() + assert result.json() == { + "id": group_connection.id, + "name": group_connection.name, + "description": group_connection.description, + "group_id": group_connection.group_id, + "type": group_connection.type, + "connection_data": { + "metastore_url": "http://rest.domain.com:8000", + "s3_warehouse_path": "/some/new/warehouse", + "s3_protocol": "https", + "s3_host": "s3.domain.com", + "s3_port": None, + "s3_bucket": "new_bucket", + "s3_region": "us-east-2", + "s3_path_style_access": False, + }, + "auth_data": { + "type": group_connection.credentials.value["type"], + "metastore_username": "new_user", + "s3_access_key": "new_access_key", + }, + } diff --git a/tests/test_unit/test_connections/test_read_connections.py b/tests/test_unit/test_connections/test_read_connections.py index 8842ae29..af814e7f 100644 --- a/tests/test_unit/test_connections/test_read_connections.py +++ b/tests/test_unit/test_connections/test_read_connections.py @@ -290,7 +290,7 @@ async def test_search_connections_with_nonexistent_query( @pytest.mark.parametrize( "filter_params, expected_total", [ - ({}, 13), # No filters applied, expecting all connections + ({}, 14), # No filters applied, expecting all connections ({"type": ["oracle"]}, 1), ({"type": ["postgres", "hive"]}, 2), ( @@ -298,6 +298,7 @@ async def test_search_connections_with_nonexistent_query( "type": [ "postgres", "hive", + "iceberg_rest_s3", "oracle", "clickhouse", "mssql", @@ -311,7 +312,7 @@ async def test_search_connections_with_nonexistent_query( "samba", ], }, - 13, + 14, ), ], ids=[ diff --git a/tests/test_unit/utils.py b/tests/test_unit/utils.py index 1489c7fd..b31bfe5e 100644 --- a/tests/test_unit/utils.py +++ b/tests/test_unit/utils.py @@ -275,6 +275,9 @@ async def fetch_connection_json(client: AsyncClient, user_token: str, mock_conne auth_data["password"] = mock_connection.credentials.value["password"] elif auth_data["type"] == "s3": auth_data["secret_key"] = mock_connection.credentials.value["secret_key"] + elif auth_data["type"] == "iceberg_rest_basic_s3_basic": + auth_data["metastore_password"] = mock_connection.credentials.value["metastore_password"] + auth_data["s3_secret_key"] = mock_connection.credentials.value["s3_secret_key"] return connection_json