Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.docker.test
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ TEST_HIVE_CLUSTER=test-hive
TEST_HIVE_USER=syncmaster
TEST_HIVE_PASSWORD=123UsedForTestOnly@!

TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST=http://test-iceberg-rest:8181
TEST_ICEBERG_METASTORE_URL_FOR_WORKER=http://test-iceberg-rest:8181
TEST_ICEBERG_METASTORE_USERNAME=syncmaster
TEST_ICEBERG_METASTORE_PASSWORD=123UsedForTestOnly@!
TEST_ICEBERG_S3_WAREHOUSE_PATH=/data
TEST_ICEBERG_S3_REGION=us-east-1
TEST_ICEBERG_S3_PATH_STYLE_ACCESS=True

TEST_HDFS_HOST=test-hive
TEST_HDFS_WEBHDFS_PORT=9870
TEST_HDFS_IPC_PORT=9820
Expand Down
8 changes: 8 additions & 0 deletions .env.local.test
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ export TEST_HIVE_CLUSTER=test-hive
export TEST_HIVE_USER=syncmaster
export TEST_HIVE_PASSWORD=123UsedForTestOnly@!

export TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST=http://localhost:8181
export TEST_ICEBERG_METASTORE_URL_FOR_WORKER=http://test-iceberg-rest:8181
export TEST_ICEBERG_METASTORE_USERNAME=syncmaster
export TEST_ICEBERG_METASTORE_PASSWORD=123UsedForTestOnly@!
export TEST_ICEBERG_S3_WAREHOUSE_PATH=/data
export TEST_ICEBERG_S3_REGION=us-east-1
export TEST_ICEBERG_S3_PATH_STYLE_ACCESS=True

export TEST_HDFS_HOST=test-hive
export TEST_HDFS_WEBHDFS_PORT=9870
export TEST_HDFS_IPC_PORT=9820
Expand Down
80 changes: 80 additions & 0 deletions .github/workflows/iceberg-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
name: Iceberg Tests
on:
workflow_call:

env:
DEFAULT_PYTHON: '3.13'

jobs:
tests:
name: Run Iceberg tests
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v5

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Cache jars
uses: actions/cache@v4
with:
path: ./cached_jars
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-iceberg
restore-keys: |
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-iceberg
${{ runner.os }}-python-

- name: Build Worker Image
uses: docker/build-push-action@v6
with:
context: .
tags: mtsrus/syncmaster-worker:${{ github.sha }}
target: test
file: docker/Dockerfile.worker
load: true
cache-from: type=gha,scope=test
cache-to: type=gha,scope=test,mode=max

- name: Docker compose up
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
docker compose -f docker-compose.test.yml --profile iceberg up -d --wait --wait-timeout 200
env:
WORKER_IMAGE_TAG: ${{ github.sha }}

- name: Run Iceberg Tests
run: |
docker compose -f ./docker-compose.test.yml --profile iceberg exec -T worker coverage run -m pytest -vvv -s -m "worker and iceberg"

- name: Dump worker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
images: bitnamilegacy/minio,mtsrus/syncmaster-worker,mtsrus/horizon-backend,postgres,rabbitmq
dest: ./logs

# This is important, as coverage is exported after receiving SIGTERM
- name: Shutdown
if: always()
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans

- name: Upload worker logs
uses: actions/upload-artifact@v5
if: failure()
with:
name: worker-logs-iceberg
path: logs/*

- name: Upload coverage results
uses: actions/upload-artifact@v5
with:
name: coverage-iceberg
path: reports/*
# https://github.com/actions/upload-artifact/issues/602
include-hidden-files: true
2 changes: 1 addition & 1 deletion .github/workflows/s3-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:

- name: Run S3 Tests
run: |
docker compose -f ./docker-compose.test.yml --profile s3 exec -T worker coverage run -m pytest -vvv -s -m "worker and s3"
docker compose -f ./docker-compose.test.yml --profile s3 exec -T worker coverage run -m pytest -vvv -s -m "worker and s3 and not iceberg"

- name: Dump worker logs on failure
if: failure()
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ jobs:
name: Hive tests
uses: ./.github/workflows/hive-tests.yml

iceberg_tests:
name: Iceberg tests
uses: ./.github/workflows/iceberg-tests.yml

s3_tests:
name: S3 tests
uses: ./.github/workflows/s3-tests.yml
Expand Down Expand Up @@ -90,6 +94,7 @@ jobs:
- ftps_tests
- hdfs_tests
- hive_tests
- iceberg_tests
- mssql_tests
- mysql_tests
- oracle_tests
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ test-integration-hive: test-db ##@Test Run integration tests for Hive
docker compose -f docker-compose.test.yml --profile hive up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m hive $(PYTEST_ARGS)

test-integration-iceberg: test-db ##@Test Run integration tests for Iceberg
docker compose -f docker-compose.test.yml --profile iceberg up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m iceberg $(PYTEST_ARGS)

test-integration-clickhouse: test-db ##@Test Run integration tests for Clickhouse
docker compose -f docker-compose.test.yml --profile clickhouse up -d --wait $(DOCKER_COMPOSE_ARGS)
${POETRY} run pytest ./tests/test_integration -m clickhouse $(PYTEST_ARGS)
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ List of currently supported connections:
* Apache Hive
* Clickhouse
* Postgres
* Iceberg (REST Catalog + S3)
* Oracle
* MSSQL
* MySQL
Expand Down
29 changes: 24 additions & 5 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ services:
condition: service_completed_successfully
rabbitmq:
condition: service_healthy
profiles: [worker, scheduler, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
profiles: [worker, scheduler, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

horizon:
image: mtsrus/horizon-backend:develop
Expand All @@ -147,7 +147,7 @@ services:
depends_on:
horizon-db:
condition: service_healthy
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
profiles: [horizon, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

horizon-db:
image: postgres:17
Expand All @@ -167,7 +167,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
profiles: [horizon, s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

test-postgres:
image: postgres:17
Expand All @@ -185,7 +185,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
profiles: [s3, hdfs, hive, iceberg, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

test-s3:
image: bitnamilegacy/minio:latest
Expand All @@ -204,7 +204,26 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [s3, all]
profiles: [s3, iceberg, all]

test-iceberg-rest:
image: tabulario/iceberg-rest:latest
container_name: test-iceberg-rest
restart: unless-stopped
environment:
CATALOG_WAREHOUSE: s3a://syncmaster/data/
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://test-s3:9000
CATALOG_S3_PATH__STYLE__ACCESS: true
AWS_ACCESS_KEY_ID: syncmaster
AWS_SECRET_ACCESS_KEY: 123UsedForTestOnly@!
AWS_REGION: us-east-1
ports:
- 8181:8181
depends_on:
test-s3:
condition: service_healthy
profiles: [iceberg, all]

test-oracle:
image: gvenzl/oracle-xe:slim-faststart
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions syncmaster/worker/handlers/db/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def connect(self, spark: SparkSession):

@slot
def read(self) -> DataFrame:
table = f"{self.transfer_dto.catalog_name}.{self.transfer_dto.table_name}"
self.connection.spark.catalog.refreshTable(table)
return super().read()

@slot
Expand Down
18 changes: 17 additions & 1 deletion syncmaster/worker/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ def get_worker_spark_session(

def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212
import pyspark
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
from onetl.connection import (
MSSQL,
Clickhouse,
Iceberg,
MySQL,
Oracle,
Postgres,
SparkS3,
)
from onetl.file.format import XML, Excel

spark_version = pyspark.__version__
Expand All @@ -75,6 +83,14 @@ def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212
if connection_types & {"s3", "all"}:
result.extend(SparkS3.get_packages(spark_version=spark_version))

if connection_types & {"iceberg_rest_s3", "all"}:
result.extend(
[
*Iceberg.get_packages(package_version="1.10.0", spark_version=spark_version),
*Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
],
)

if connection_types & {"s3", "hdfs", "sftp", "ftp", "ftps", "samba", "webdav", "all"}:
result.extend(file_formats_spark_packages)

Expand Down
8 changes: 8 additions & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ class TestSettings(BaseSettings):
TEST_HIVE_USER: str
TEST_HIVE_PASSWORD: str

TEST_ICEBERG_METASTORE_URL_FOR_CONFTEST: str
TEST_ICEBERG_METASTORE_URL_FOR_WORKER: str
TEST_ICEBERG_METASTORE_USERNAME: str
TEST_ICEBERG_METASTORE_PASSWORD: str
TEST_ICEBERG_S3_WAREHOUSE_PATH: str
TEST_ICEBERG_S3_REGION: str
TEST_ICEBERG_S3_PATH_STYLE_ACCESS: bool = True

TEST_HDFS_HOST: str
TEST_HDFS_WEBHDFS_PORT: int
TEST_HDFS_IPC_PORT: int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
hive_connection,
prepare_hive,
)
from tests.test_integration.test_run_transfer.connection_fixtures.iceberg_fixtures import (
iceberg_rest_s3_connection,
iceberg_rest_s3_for_conftest,
iceberg_rest_s3_for_worker,
prepare_iceberg_rest_s3,
)
from tests.test_integration.test_run_transfer.connection_fixtures.mssql_fixtures import (
mssql_connection,
mssql_for_conftest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def dataframe_columns_filter_transformations(source_type: str):
"mysql": "CHAR",
"mssql": "VARCHAR(30)",
"hive": "VARCHAR(30)",
"iceberg_rest_s3": "STRING",
"s3": "STRING",
"hdfs": "STRING",
}
Expand Down
Loading
Loading