From d96a99934d3432e53b41860426c184900a508f62 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 20 Oct 2024 21:27:30 +0200 Subject: [PATCH] chore: renames --- .../src/dagster_pyiceberg/handler.py | 12 ++++++------ tests/dagster_pyiceberg/test_handler.py | 14 +++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py index 5d8260c..2811c55 100644 --- a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py +++ b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py @@ -153,7 +153,7 @@ def supported_types(self) -> Sequence[Type[object]]: return (pa.Table, pa.RecordBatchReader) -class IcebergToDagsterPartitionMapper: +class PartitionMapper: def __init__( self, @@ -264,7 +264,7 @@ def dagster_time_partitions(self) -> List[TablePartitionDimension]: return time_partitions @property - def updated_time_partition_field(self) -> str | None: + def updated_dagster_time_partition_field(self) -> str | None: """If time partitions present, check whether these have been updated. This happens when users change e.g. from an hourly to a daily partition.""" # The assumption is that even a multi-partitioned table will have only one time partition @@ -300,7 +300,7 @@ def updated(self) -> List[TablePartitionDimension]: return [ p for p in self.get_table_slice_partition_dimensions() - if p.partition_expr == self.updated_time_partition_field + if p.partition_expr == self.updated_dagster_time_partition_field ] def deleted(self) -> List[partitioning.PartitionField]: @@ -316,7 +316,7 @@ class IcebergTableSpecUpdater: def __init__( self, - partition_mapping: IcebergToDagsterPartitionMapper, + partition_mapping: PartitionMapper, partition_spec_update_mode: str, ): self.partition_spec_update_mode = partition_spec_update_mode @@ -476,7 +476,7 @@ def _table_writer( # But this should be a configuration option per table if partition_dimensions is not None: IcebergTableSpecUpdater( - partition_mapping=IcebergToDagsterPartitionMapper( + partition_mapping=PartitionMapper( table_slice=table_slice, iceberg_table_schema=table.schema(), iceberg_partition_spec=table.spec(), @@ -495,7 +495,7 @@ def _table_writer( # TODO: add updates and deletes if partition_dimensions is not None: IcebergTableSpecUpdater( - partition_mapping=IcebergToDagsterPartitionMapper( + partition_mapping=PartitionMapper( table_slice=table_slice, iceberg_table_schema=table.schema(), iceberg_partition_spec=table.spec(), diff --git a/tests/dagster_pyiceberg/test_handler.py b/tests/dagster_pyiceberg/test_handler.py index 9a11991..ac8f248 100644 --- a/tests/dagster_pyiceberg/test_handler.py +++ b/tests/dagster_pyiceberg/test_handler.py @@ -492,7 +492,7 @@ def test_iceberg_to_dagster_partition_mapper_new_fields( ), ], ) - new_partitions = handler.IcebergToDagsterPartitionMapper( + new_partitions = handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -520,7 +520,7 @@ def test_iceberg_to_dagster_partition_mapper_changed_time_partition( ), ], ) - updated_partitions = handler.IcebergToDagsterPartitionMapper( + updated_partitions = handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -545,7 +545,7 @@ def test_iceberg_to_dagster_partition_mapper_deleted( schema="pytest", partition_dimensions=[], ) - deleted_partitions = handler.IcebergToDagsterPartitionMapper( + deleted_partitions = handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -577,7 +577,7 @@ def test_iceberg_table_spec_updater_delete_field( ], ) spec_updater = handler.IcebergTableSpecUpdater( - partition_mapping=handler.IcebergToDagsterPartitionMapper( + partition_mapping=handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -611,7 +611,7 @@ def test_iceberg_table_spec_updater_update_field( ], ) spec_updater = handler.IcebergTableSpecUpdater( - partition_mapping=handler.IcebergToDagsterPartitionMapper( + partition_mapping=handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -646,7 +646,7 @@ def test_iceberg_table_spec_updater_add_field( ], ) spec_updater = handler.IcebergTableSpecUpdater( - partition_mapping=handler.IcebergToDagsterPartitionMapper( + partition_mapping=handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice, @@ -678,7 +678,7 @@ def test_iceberg_table_spec_updater_fails_with_error_update_mode( ], ) spec_updater = handler.IcebergTableSpecUpdater( - partition_mapping=handler.IcebergToDagsterPartitionMapper( + partition_mapping=handler.PartitionMapper( iceberg_table_schema=iceberg_table_schema, iceberg_partition_spec=spec, table_slice=table_slice,