diff --git a/.env.docker.test b/.env.docker.test index 81355aca..e6bfa108 100644 --- a/.env.docker.test +++ b/.env.docker.test @@ -51,6 +51,14 @@ TEST_HIVE_CLUSTER=test-hive TEST_HIVE_USER=syncmaster TEST_HIVE_PASSWORD=123UsedForTestOnly@! +TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST=http://test-iceberg-rest:8181 +TEST_ICEBERG_METASTORE_URL_FOR_WORKER=http://test-iceberg-rest:8181 +TEST_ICEBERG_METASTORE_USERNAME=syncmaster +TEST_ICEBERG_METASTORE_PASSWORD=123UsedForTestOnly@! +TEST_ICEBERG_S3_WAREHOUSE_PATH=/data +TEST_ICEBERG_S3_REGION=us-east-1 +TEST_ICEBERG_S3_PATH_STYLE_ACCESS=True + TEST_HDFS_HOST=test-hive TEST_HDFS_WEBHDFS_PORT=9870 TEST_HDFS_IPC_PORT=9820 diff --git a/.env.local.test b/.env.local.test index c0a5e224..e123efbc 100644 --- a/.env.local.test +++ b/.env.local.test @@ -51,6 +51,14 @@ export TEST_HIVE_CLUSTER=test-hive export TEST_HIVE_USER=syncmaster export TEST_HIVE_PASSWORD=123UsedForTestOnly@! +export TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST=http://localhost:8181 +export TEST_ICEBERG_METASTORE_URL_FOR_WORKER=http://test-iceberg-rest:8181 +export TEST_ICEBERG_METASTORE_USERNAME=syncmaster +export TEST_ICEBERG_METASTORE_PASSWORD=123UsedForTestOnly@! +export TEST_ICEBERG_S3_WAREHOUSE_PATH=/data +export TEST_ICEBERG_S3_REGION=us-east-1 +export TEST_ICEBERG_S3_PATH_STYLE_ACCESS=True + export TEST_HDFS_HOST=test-hive export TEST_HDFS_WEBHDFS_PORT=9870 export TEST_HDFS_IPC_PORT=9820 diff --git a/.github/workflows/iceberg-tests.yml b/.github/workflows/iceberg-tests.yml new file mode 100644 index 00000000..de6be645 --- /dev/null +++ b/.github/workflows/iceberg-tests.yml @@ -0,0 +1,80 @@ +name: Iceberg Tests +on: + workflow_call: + +env: + DEFAULT_PYTHON: '3.13' + +jobs: + tests: + name: Run Iceberg tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v5 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Cache jars + uses: actions/cache@v4 + with: + path: ./cached_jars + key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-iceberg + restore-keys: | + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-iceberg + ${{ runner.os }}-python- + + - name: Build Worker Image + uses: docker/build-push-action@v6 + with: + context: . + tags: mtsrus/syncmaster-worker:${{ github.sha }} + target: test + file: docker/Dockerfile.worker + load: true + cache-from: type=gha,scope=test + cache-to: type=gha,scope=test,mode=max + + - name: Docker compose up + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + docker compose -f docker-compose.test.yml --profile iceberg up -d --wait --wait-timeout 200 + env: + WORKER_IMAGE_TAG: ${{ github.sha }} + + - name: Run Iceberg Tests + run: | + docker compose -f ./docker-compose.test.yml --profile iceberg exec -T worker coverage run -m pytest -vvv -s -m "worker and iceberg" + + - name: Dump worker logs on failure + if: failure() + uses: jwalton/gh-docker-logs@v2 + with: + images: bitnamilegacy/minio,mtsrus/syncmaster-worker,mtsrus/horizon-backend,postgres,rabbitmq + dest: ./logs + + # This is important, as coverage is exported after receiving SIGTERM + - name: Shutdown + if: always() + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + + - name: Upload worker logs + uses: actions/upload-artifact@v5 + if: failure() + with: + name: worker-logs-iceberg + path: logs/* + + - name: Upload coverage results + uses: actions/upload-artifact@v5 + with: + name: coverage-iceberg + path: reports/* + # https://github.com/actions/upload-artifact/issues/602 + include-hidden-files: true diff --git a/.github/workflows/s3-tests.yml b/.github/workflows/s3-tests.yml index e284f662..d33fb4b5 100644 --- a/.github/workflows/s3-tests.yml +++ b/.github/workflows/s3-tests.yml @@ -49,7 +49,7 @@ jobs: - name: Run S3 Tests run: | - docker compose -f ./docker-compose.test.yml --profile s3 exec -T worker coverage run -m pytest -vvv -s -m "worker and s3" + docker compose -f ./docker-compose.test.yml --profile s3 exec -T worker coverage run -m pytest -vvv -s -m "worker and s3 and not iceberg" - name: Dump worker logs on failure if: failure() diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d7c4c459..fb17a8b9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -46,6 +46,10 @@ jobs: name: Hive tests uses: ./.github/workflows/hive-tests.yml + iceberg_tests: + name: Iceberg tests + uses: ./.github/workflows/iceberg-tests.yml + s3_tests: name: S3 tests uses: ./.github/workflows/s3-tests.yml @@ -90,6 +94,7 @@ jobs: - ftps_tests - hdfs_tests - hive_tests + - iceberg_tests - mssql_tests - mysql_tests - oracle_tests diff --git a/Makefile b/Makefile index 7b000ada..b5d04da8 100644 --- a/Makefile +++ b/Makefile @@ -89,6 +89,10 @@ test-integration-hive: test-db ##@Test Run integration tests for Hive docker compose -f docker-compose.test.yml --profile hive up -d --wait $(DOCKER_COMPOSE_ARGS) ${POETRY} run pytest ./tests/test_integration -m hive $(PYTEST_ARGS) +test-integration-iceberg: test-db ##@Test Run integration tests for Iceberg + docker compose -f docker-compose.test.yml --profile iceberg up -d --wait $(DOCKER_COMPOSE_ARGS) + ${POETRY} run pytest ./tests/test_integration -m iceberg $(PYTEST_ARGS) + test-integration-clickhouse: test-db ##@Test Run integration tests for Clickhouse docker compose -f docker-compose.test.yml --profile clickhouse up -d --wait $(DOCKER_COMPOSE_ARGS) ${POETRY} run pytest ./tests/test_integration -m clickhouse $(PYTEST_ARGS) diff --git a/README.rst b/README.rst index 09c9ca00..62ed6e42 100644 --- a/README.rst +++ b/README.rst @@ -36,6 +36,7 @@ List of currently supported connections: * Apache Hive * Clickhouse * Postgres +* Iceberg (REST Catalog + S3) * Oracle * MSSQL * MySQL diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 966ad092..b6a2a35c 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -124,7 +124,7 @@ services: condition: service_completed_successfully rabbitmq: condition: service_healthy - profiles: [worker, scheduler, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + profiles: [worker, scheduler, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] horizon: image: mtsrus/horizon-backend:develop @@ -147,7 +147,7 @@ services: depends_on: horizon-db: condition: service_healthy - profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + profiles: [horizon, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] horizon-db: image: postgres:17 @@ -167,7 +167,7 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + profiles: [horizon, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] test-postgres: image: postgres:17 @@ -185,7 +185,7 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] + profiles: [s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all] test-s3: image: bitnamilegacy/minio:latest @@ -204,7 +204,26 @@ services: interval: 30s timeout: 5s retries: 3 - profiles: [s3, all] + profiles: [s3, iceberg, all] + + test-iceberg-rest: + image: tabulario/iceberg-rest:latest + container_name: test-iceberg-rest + restart: unless-stopped + environment: + CATALOG_WAREHOUSE: s3a://syncmaster/data/ + CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO + CATALOG_S3_ENDPOINT: http://test-s3:9000 + CATALOG_S3_PATH__STYLE__ACCESS: true + AWS_ACCESS_KEY_ID: syncmaster + AWS_SECRET_ACCESS_KEY: 123UsedForTestOnly@! + AWS_REGION: us-east-1 + ports: + - 8181:8181 + depends_on: + test-s3: + condition: service_healthy + profiles: [iceberg, all] test-oracle: image: gvenzl/oracle-xe:slim-faststart diff --git a/poetry.lock b/poetry.lock index 77b9396d..ba4a5a03 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2249,7 +2249,7 @@ webdav = ["webdavclient3"] type = "git" url = "https://github.com/MobileTeleSystems/onetl.git" reference = "develop" -resolved_reference = "a1b7b5a0389ed668365cf57161ded6aa12556957" +resolved_reference = "371acd14adc9b698fc7daa8d8669b162528e33f9" [[package]] name = "ordered-set" diff --git a/syncmaster/worker/handlers/db/iceberg.py b/syncmaster/worker/handlers/db/iceberg.py index 80e37531..04ad0776 100644 --- a/syncmaster/worker/handlers/db/iceberg.py +++ b/syncmaster/worker/handlers/db/iceberg.py @@ -53,6 +53,8 @@ def connect(self, spark: SparkSession): @slot def read(self) -> DataFrame: + table = f"{self.transfer_dto.catalog_name}.{self.transfer_dto.table_name}" + self.connection.spark.catalog.refreshTable(table) return super().read() @slot diff --git a/syncmaster/worker/spark.py b/syncmaster/worker/spark.py index 192b4a08..8f7c1e98 100644 --- a/syncmaster/worker/spark.py +++ b/syncmaster/worker/spark.py @@ -49,7 +49,15 @@ def get_worker_spark_session( def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212 import pyspark - from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3 + from onetl.connection import ( + MSSQL, + Clickhouse, + Iceberg, + MySQL, + Oracle, + Postgres, + SparkS3, + ) from onetl.file.format import XML, Excel spark_version = pyspark.__version__ @@ -75,6 +83,14 @@ def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212 if connection_types & {"s3", "all"}: result.extend(SparkS3.get_packages(spark_version=spark_version)) + if connection_types & {"iceberg_rest_s3", "all"}: + result.extend( + [ + *Iceberg.get_packages(package_version="1.10.0", spark_version=spark_version), + *Iceberg.S3Warehouse.get_packages(package_version="1.10.0"), + ], + ) + if connection_types & {"s3", "hdfs", "sftp", "ftp", "ftps", "samba", "webdav", "all"}: result.extend(file_formats_spark_packages) diff --git a/tests/settings.py b/tests/settings.py index bd5813a9..3aef509d 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -50,6 +50,14 @@ class TestSettings(BaseSettings): TEST_HIVE_USER: str TEST_HIVE_PASSWORD: str + TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST: str + TEST_ICEBERG_METASTORE_URL_FOR_WORKER: str + TEST_ICEBERG_METASTORE_USERNAME: str + TEST_ICEBERG_METASTORE_PASSWORD: str + TEST_ICEBERG_S3_WAREHOUSE_PATH: str + TEST_ICEBERG_S3_REGION: str + TEST_ICEBERG_S3_PATH_STYLE_ACCESS: bool = True + TEST_HDFS_HOST: str TEST_HDFS_WEBHDFS_PORT: int TEST_HDFS_IPC_PORT: int diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py index 5d40df2a..b6ffabef 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/__init__.py @@ -61,6 +61,12 @@ hive_connection, prepare_hive, ) +from tests.test_integration.test_run_transfer.connection_fixtures.iceberg_fixtures import ( + iceberg_rest_s3_connection, + iceberg_rest_s3_for_conftest, + iceberg_rest_s3_for_worker, + prepare_iceberg_rest_s3, +) from tests.test_integration.test_run_transfer.connection_fixtures.mssql_fixtures import ( mssql_connection, mssql_for_conftest, diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py index 516e4432..62dd00a2 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py @@ -57,6 +57,7 @@ def dataframe_columns_filter_transformations(source_type: str): "mysql": "CHAR", "mssql": "VARCHAR(30)", "hive": "VARCHAR(30)", + "iceberg_rest_s3": "STRING", "s3": "STRING", "hdfs": "STRING", } diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py new file mode 100644 index 00000000..0bf73889 --- /dev/null +++ b/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py @@ -0,0 +1,157 @@ +import logging +import secrets + +import pytest +import pytest_asyncio +from onetl.connection import S3, Iceberg +from onetl.db import DBWriter +from pyspark.sql import DataFrame, SparkSession +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import Group +from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO +from syncmaster.server.settings import ServerAppSettings as Settings +from tests.settings import TestSettings +from tests.test_unit.utils import create_connection, create_credentials + +logger = logging.getLogger(__name__) + + +@pytest.fixture( + scope="session", + params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])], +) +def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCatalogS3ConnectionDTO: + return IcebergRESTCatalogS3ConnectionDTO( + metastore_url=test_settings.TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST, + s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH, + s3_host=test_settings.TEST_S3_HOST_FOR_CONFTEST, + s3_port=test_settings.TEST_S3_PORT_FOR_CONFTEST, + s3_protocol=test_settings.TEST_S3_PROTOCOL, + s3_bucket=test_settings.TEST_S3_BUCKET, + s3_region=test_settings.TEST_ICEBERG_S3_REGION, + s3_path_style_access=test_settings.TEST_ICEBERG_S3_PATH_STYLE_ACCESS, + s3_access_key=test_settings.TEST_S3_ACCESS_KEY, + s3_secret_key=test_settings.TEST_S3_SECRET_KEY, + metastore_username=test_settings.TEST_ICEBERG_METASTORE_USERNAME, + metastore_password=test_settings.TEST_ICEBERG_METASTORE_PASSWORD, + ) + + +@pytest.fixture( + scope="session", + params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])], +) +def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalogS3ConnectionDTO: + return IcebergRESTCatalogS3ConnectionDTO( + metastore_url=test_settings.TEST_ICEBERG_METASTORE_URL_FOR_WORKER, + s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH, + s3_host=test_settings.TEST_S3_HOST_FOR_WORKER, + s3_port=test_settings.TEST_S3_PORT_FOR_WORKER, + s3_protocol=test_settings.TEST_S3_PROTOCOL, + s3_bucket=test_settings.TEST_S3_BUCKET, + s3_region=test_settings.TEST_ICEBERG_S3_REGION, + s3_path_style_access=test_settings.TEST_ICEBERG_S3_PATH_STYLE_ACCESS, + s3_access_key=test_settings.TEST_S3_ACCESS_KEY, + s3_secret_key=test_settings.TEST_S3_SECRET_KEY, + metastore_username=test_settings.TEST_ICEBERG_METASTORE_USERNAME, + metastore_password=test_settings.TEST_ICEBERG_METASTORE_PASSWORD, + ) + + +@pytest.fixture +def prepare_iceberg_rest_s3( + spark: SparkSession, + iceberg_rest_s3_for_conftest: IcebergRESTCatalogS3ConnectionDTO, + s3_file_connection: S3, +): + iceberg = iceberg_rest_s3_for_conftest + catalog_name = "iceberg_rest_s3" + namespace = "default" + source_table = f"{catalog_name}.{namespace}.source_table" + target_table = f"{catalog_name}.{namespace}.target_table" + + connection = Iceberg( + spark=spark, + catalog_name=catalog_name, + catalog=Iceberg.RESTCatalog( + uri=iceberg.metastore_url, + auth=Iceberg.RESTCatalog.BasicAuth( + user=iceberg.metastore_username, + password=iceberg.metastore_password, + ), + ), + warehouse=Iceberg.S3Warehouse( + path=iceberg.s3_warehouse_path, + host=iceberg.s3_host, + port=iceberg.s3_port, + protocol=iceberg.s3_protocol, + bucket=iceberg.s3_bucket, + path_style_access=iceberg.s3_path_style_access, + region=iceberg.s3_region, + access_key=iceberg.s3_access_key, + secret_key=iceberg.s3_secret_key, + ), + ).check() + + connection.execute(f"DROP TABLE IF EXISTS {source_table}") + connection.execute(f"DROP TABLE IF EXISTS {target_table}") + connection.execute(f"CREATE NAMESPACE IF NOT EXISTS {catalog_name}.{namespace}") + + def fill_with_data(df: DataFrame): + logger.info("START PREPARE ICEBERG") + db_writer = DBWriter( + connection=connection, + target=f"{namespace}.source_table", + ) + db_writer.run(df) + spark.catalog.refreshTable(source_table) + logger.info("END PREPARE ICEBERG") + + yield connection, fill_with_data + + connection.execute(f"DROP TABLE IF EXISTS {source_table}") + connection.execute(f"DROP TABLE IF EXISTS {target_table}") + + +@pytest_asyncio.fixture +async def iceberg_rest_s3_connection( + iceberg_rest_s3_for_worker: IcebergRESTCatalogS3ConnectionDTO, + settings: Settings, + session: AsyncSession, + group: Group, +): + iceberg = iceberg_rest_s3_for_worker + result = await create_connection( + session=session, + name=secrets.token_hex(5), + type=iceberg.type, + data=dict( + metastore_url=iceberg.metastore_url, + s3_warehouse_path=iceberg.s3_warehouse_path, + s3_host=iceberg.s3_host, + s3_port=iceberg.s3_port, + s3_protocol=iceberg.s3_protocol, + s3_bucket=iceberg.s3_bucket, + s3_region=iceberg.s3_region, + s3_path_style_access=iceberg.s3_path_style_access, + ), + group_id=group.id, + ) + + await create_credentials( + session=session, + settings=settings, + connection_id=result.id, + auth_data=dict( + type="iceberg_rest_basic_s3_basic", + s3_access_key=iceberg.s3_access_key, + s3_secret_key=iceberg.s3_secret_key, + metastore_username=iceberg.metastore_username, + metastore_password=iceberg.metastore_password, + ), + ) + + yield result + await session.delete(result) + await session.commit() diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py index c5bc75a7..df855257 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py @@ -2,7 +2,15 @@ import pyspark import pytest -from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3 +from onetl.connection import ( + MSSQL, + Clickhouse, + Iceberg, + MySQL, + Oracle, + Postgres, + SparkS3, +) from onetl.file.format import XML, Excel from pyspark.sql import SparkSession from pytest import FixtureRequest @@ -26,12 +34,14 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession: spark = ( SparkSession.builder.appName("celery_worker") - .enableHiveSupport() .config("spark.sql.pyspark.jvmStacktrace.enabled", "true") .config("spark.driver.host", "localhost") .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") ) + if "hive" in markers: + spark = spark.enableHiveSupport() + if "postgres" in markers: maven_packages.extend(Postgres.get_packages()) @@ -67,6 +77,14 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession: ) ) + if "iceberg" in markers: + maven_packages.extend( + [ + *Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"), + *Iceberg.S3Warehouse.get_packages(package_version="1.10.0"), + ], + ) + if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba", "webdav"}): # excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902 file_formats_spark_packages: list[str] = [ diff --git a/tests/test_integration/test_run_transfer/test_iceberg.py b/tests/test_integration/test_run_transfer/test_iceberg.py new file mode 100644 index 00000000..ea4167e2 --- /dev/null +++ b/tests/test_integration/test_run_transfer/test_iceberg.py @@ -0,0 +1,373 @@ +import secrets + +import pytest +import pytest_asyncio +from httpx import AsyncClient +from onetl.db import DBReader +from pyspark.sql import DataFrame, SparkSession +from pytest_lazy_fixtures import lf +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import Connection, Group, Queue, Transfer +from tests.mocks import MockUser +from tests.test_unit.utils import create_transfer +from tests.utils import ( + cast_dataframe_types, + run_transfer_and_verify, + split_df, +) + +pytestmark = [pytest.mark.asyncio, pytest.mark.worker] + + +@pytest_asyncio.fixture +async def postgres_to_iceberg_rest_s3( + session: AsyncSession, + group: Group, + queue: Queue, + iceberg_rest_s3_connection: Connection, + postgres_connection: Connection, + strategy: dict, + transformations: list[dict], +): + result = await create_transfer( + session=session, + group_id=group.id, + name=f"postgres_to_iceberg_rest_s3_{secrets.token_hex(5)}", + source_connection_id=postgres_connection.id, + target_connection_id=iceberg_rest_s3_connection.id, + source_params={ + "type": "postgres", + "table_name": "public.source_table", + }, + target_params={ + "type": "iceberg_rest_s3", + "table_name": "default.target_table", + "catalog_name": "iceberg_rest_s3", + }, + strategy_params=strategy, + transformations=transformations, + queue_id=queue.id, + ) + yield result + await session.delete(result) + await session.commit() + + +@pytest_asyncio.fixture +async def iceberg_rest_s3_to_postgres( + session: AsyncSession, + group: Group, + queue: Queue, + iceberg_rest_s3_connection: Connection, + postgres_connection: Connection, + strategy: dict, + transformations: list[dict], +): + result = await create_transfer( + session=session, + group_id=group.id, + name=f"iceberg_rest_s3_to_postgres_{secrets.token_hex(5)}", + source_connection_id=iceberg_rest_s3_connection.id, + target_connection_id=postgres_connection.id, + source_params={ + "type": "iceberg_rest_s3", + "table_name": "default.source_table", + "catalog_name": "iceberg_rest_s3", + }, + target_params={ + "type": "postgres", + "table_name": "public.target_table", + }, + strategy_params=strategy, + transformations=transformations, + queue_id=queue.id, + ) + yield result + await session.delete(result) + await session.commit() + + +@pytest.mark.parametrize( + "strategy, transformations", + [ + ( + lf("full_strategy"), + [], + ), + ], +) +async def test_run_transfer_postgres_to_iceberg_rest_s3_with_full_strategy( + client: AsyncClient, + group_owner: MockUser, + prepare_postgres, + prepare_iceberg_rest_s3, + init_df: DataFrame, + postgres_to_iceberg_rest_s3: Transfer, + strategy, + transformations, +): + _, fill_with_data = prepare_postgres + fill_with_data(init_df) + iceberg, _ = prepare_iceberg_rest_s3 + + await run_transfer_and_verify( + client, + group_owner, + postgres_to_iceberg_rest_s3.id, + target_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=iceberg, + table="default.target_table", + ) + df = reader.run() + + df, init_df = cast_dataframe_types(df, init_df) + assert df.sort("ID").collect() == init_df.sort("ID").collect() + + +@pytest.mark.parametrize( + "strategy, transformations", + [ + ( + lf("full_strategy"), + [], + ), + ], +) +async def test_run_transfer_postgres_to_iceberg_rest_s3_mixed_naming_with_full_strategy( + client: AsyncClient, + group_owner: MockUser, + prepare_postgres, + prepare_iceberg_rest_s3, + init_df_with_mixed_column_naming: DataFrame, + postgres_to_iceberg_rest_s3: Transfer, + strategy, + transformations, +): + _, fill_with_data = prepare_postgres + fill_with_data(init_df_with_mixed_column_naming) + iceberg, _ = prepare_iceberg_rest_s3 + + await run_transfer_and_verify( + client, + group_owner, + postgres_to_iceberg_rest_s3.id, + target_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=iceberg, + table="default.target_table", + ) + df = reader.run() + + assert df.columns != init_df_with_mixed_column_naming.columns + assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] + + df, init_df_with_mixed_column_naming = cast_dataframe_types(df, init_df_with_mixed_column_naming) + assert df.sort("ID").collect() == init_df_with_mixed_column_naming.sort("ID").collect() + + +@pytest.mark.parametrize( + "strategy, transformations", + [ + ( + lf("incremental_strategy_by_number_column"), + [], + ), + ], +) +async def test_run_transfer_postgres_to_iceberg_rest_s3_with_incremental_strategy( + spark: SparkSession, + client: AsyncClient, + group_owner: MockUser, + prepare_postgres, + prepare_iceberg_rest_s3, + init_df: DataFrame, + postgres_to_iceberg_rest_s3: Transfer, + strategy, + transformations, +): + _, fill_with_data = prepare_postgres + iceberg, _ = prepare_iceberg_rest_s3 + + first_transfer_df, second_transfer_df = split_df(df=init_df, ratio=0.6, keep_sorted_by="number") + fill_with_data(first_transfer_df) + await run_transfer_and_verify( + client, + group_owner, + postgres_to_iceberg_rest_s3.id, + target_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=iceberg, + table="default.target_table", + ) + df = reader.run() + + df, first_transfer_df = cast_dataframe_types(df, first_transfer_df) + assert df.sort("ID").collect() == first_transfer_df.sort("ID").collect() + + fill_with_data(second_transfer_df) + await run_transfer_and_verify( + client, + group_owner, + postgres_to_iceberg_rest_s3.id, + target_auth="iceberg_rest_basic_s3_basic", + ) + + spark.catalog.refreshTable("iceberg_rest_s3.default.target_table") + df_with_increment = reader.run() + df_with_increment, init_df = cast_dataframe_types(df_with_increment, init_df) + assert df_with_increment.sort("ID").collect() == init_df.sort("ID").collect() + + +@pytest.mark.parametrize( + "source_type, strategy, transformations, expected_filter", + [ + ( + "iceberg_rest_s3", + lf("full_strategy"), + lf("dataframe_rows_filter_transformations"), + lf("expected_dataframe_rows_filter"), + ), + ( + "iceberg_rest_s3", + lf("full_strategy"), + lf("dataframe_columns_filter_transformations"), + lf("expected_dataframe_columns_filter"), + ), + ], +) +async def test_run_transfer_iceberg_rest_s3_to_postgres_with_full_strategy( + client: AsyncClient, + group_owner: MockUser, + prepare_iceberg_rest_s3, + prepare_postgres, + init_df: DataFrame, + iceberg_rest_s3_to_postgres: Transfer, + source_type, + strategy, + transformations, + expected_filter, +): + _, fill_with_data = prepare_iceberg_rest_s3 + fill_with_data(init_df) + postgres, _ = prepare_postgres + init_df = expected_filter(init_df, source_type) + + await run_transfer_and_verify( + client, + group_owner, + iceberg_rest_s3_to_postgres.id, + source_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df = reader.run() + + df, init_df = cast_dataframe_types(df, init_df) + assert df.sort("ID").collect() == init_df.sort("ID").collect() + + +@pytest.mark.parametrize( + "strategy, transformations", + [ + ( + lf("full_strategy"), + [], + ), + ], +) +async def test_run_transfer_iceberg_rest_s3_to_postgres_mixes_naming_with_full_strategy( + client: AsyncClient, + group_owner: MockUser, + prepare_iceberg_rest_s3, + prepare_postgres, + init_df_with_mixed_column_naming: DataFrame, + iceberg_rest_s3_to_postgres: Transfer, + strategy, + transformations, +): + _, fill_with_data = prepare_iceberg_rest_s3 + fill_with_data(init_df_with_mixed_column_naming) + postgres, _ = prepare_postgres + + await run_transfer_and_verify( + client, + group_owner, + iceberg_rest_s3_to_postgres.id, + source_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df = reader.run() + + assert df.columns != init_df_with_mixed_column_naming.columns + assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] + + df, init_df_with_mixed_column_naming = cast_dataframe_types(df, init_df_with_mixed_column_naming) + assert df.sort("ID").collect() == init_df_with_mixed_column_naming.sort("ID").collect() + + +@pytest.mark.parametrize( + "strategy, transformations", + [ + ( + lf("incremental_strategy_by_number_column"), + [], + ), + ], +) +async def test_run_transfer_iceberg_rest_s3_to_postgres_with_incremental_strategy( + client: AsyncClient, + group_owner: MockUser, + prepare_iceberg_rest_s3, + prepare_postgres, + init_df: DataFrame, + iceberg_rest_s3_to_postgres: Transfer, + strategy, + transformations, +): + _, fill_with_data = prepare_iceberg_rest_s3 + postgres, _ = prepare_postgres + + first_transfer_df, second_transfer_df = split_df(df=init_df, ratio=0.6, keep_sorted_by="number") + fill_with_data(first_transfer_df) + await run_transfer_and_verify( + client, + group_owner, + iceberg_rest_s3_to_postgres.id, + source_auth="iceberg_rest_basic_s3_basic", + ) + + reader = DBReader( + connection=postgres, + table="public.target_table", + ) + df = reader.run() + + df, first_transfer_df = cast_dataframe_types(df, first_transfer_df) + assert df.sort("ID").collect() == first_transfer_df.sort("ID").collect() + + fill_with_data(second_transfer_df) + await run_transfer_and_verify( + client, + group_owner, + iceberg_rest_s3_to_postgres.id, + source_auth="iceberg_rest_basic_s3_basic", + ) + + df_with_increment = reader.run() + df_with_increment, init_df = cast_dataframe_types(df_with_increment, init_df) + assert df_with_increment.sort("ID").collect() == init_df.sort("ID").collect() diff --git a/tests/test_integration/test_run_transfer/test_mssql.py b/tests/test_integration/test_run_transfer/test_mssql.py index 9d8c18ab..127a591b 100644 --- a/tests/test_integration/test_run_transfer/test_mssql.py +++ b/tests/test_integration/test_run_transfer/test_mssql.py @@ -216,7 +216,7 @@ async def test_run_transfer_postgres_to_mssql_with_incremental_strategy( df_with_increment = reader.run() df_with_increment, init_df = truncate_datetime_to_seconds(df_with_increment, init_df) df_with_increment, init_df = cast_dataframe_types(df_with_increment, init_df) - assert df.sort("ID").collect() == init_df.sort("ID").collect() + assert df_with_increment.sort("ID").collect() == init_df.sort("ID").collect() @pytest.mark.parametrize( diff --git a/tests/utils.py b/tests/utils.py index fe4a1c2a..31844b40 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -133,22 +133,21 @@ async def get_run_on_end( def verify_transfer_auth_data(run_data: dict[str, Any], source_auth: str, target_auth: str) -> None: - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] - - if source_auth == "s3": - assert source_auth_data["access_key"] - assert "secret_key" not in source_auth_data - else: - assert source_auth_data["user"] - assert "password" not in source_auth_data - - if target_auth == "s3": - assert target_auth_data["access_key"] - assert "secret_key" not in target_auth_data - else: - assert target_auth_data["user"] - assert "password" not in target_auth_data + for auth_type, auth_data in [ + (source_auth, run_data["transfer_dump"]["source_connection"]["auth_data"]), + (target_auth, run_data["transfer_dump"]["target_connection"]["auth_data"]), + ]: + if auth_type == "s3": + assert auth_data["access_key"] + assert "secret_key" not in auth_data + elif auth_type == "iceberg_rest_basic_s3_basic": + assert auth_data["s3_access_key"] + assert auth_data["metastore_username"] + assert "s3_secret_key" not in auth_data + assert "metastore_password" not in auth_data + else: + assert auth_data["user"] + assert "password" not in auth_data async def run_transfer_and_verify(