diff --git a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py index c057f1e..5db1b9f 100644 --- a/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py +++ b/packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py @@ -197,6 +197,14 @@ def new_partition_field_names(self) -> Set[str]: self.iceberg_table_partition_field_names.values() ) + @property + def deleted_partition_field_names(self) -> Set[str]: + return set(self.iceberg_table_partition_field_names.values()) - set( + self.get_dagster_partition_dimension_names( + allow_empty_dagster_partitions=True + ) + ) + @property def dagster_time_partitions(self) -> List[TablePartitionDimension]: time_partitions = [ @@ -244,15 +252,12 @@ def updated(self) -> List[TablePartitionDimension]: if p.partition_expr == self.updated_time_partition_field ] - def deleted(self) -> List[str]: - return list( - set(self.iceberg_table_partition_field_names.values()) - - set( - self.get_dagster_partition_dimension_names( - allow_empty_dagster_partitions=True - ) - ) - ) + def deleted(self) -> List[partitioning.PartitionField]: + return [ + p + for p in self.iceberg_partition_spec.fields + if p.name in self.deleted_partition_field_names + ] def _update_table_spec( diff --git a/tests/dagster_pyiceberg/test_handler.py b/tests/dagster_pyiceberg/test_handler.py index 9921f90..4975556 100644 --- a/tests/dagster_pyiceberg/test_handler.py +++ b/tests/dagster_pyiceberg/test_handler.py @@ -561,4 +561,4 @@ def test_iceberg_to_dagster_partition_mapper_deleted(): ).deleted() assert len(deleted_partitions) == 2 - assert sorted(deleted_partitions) == ["category", "timestamp"] + assert sorted(p.name for p in deleted_partitions) == ["category", "timestamp"]