Skip to content

Commit

Permalink
chore: return iceberg partion spec fields
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Oct 19, 2024
1 parent 2856524 commit 5cc9567
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
23 changes: 14 additions & 9 deletions packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ def new_partition_field_names(self) -> Set[str]:
self.iceberg_table_partition_field_names.values()
)

@property
def deleted_partition_field_names(self) -> Set[str]:
return set(self.iceberg_table_partition_field_names.values()) - set(
self.get_dagster_partition_dimension_names(
allow_empty_dagster_partitions=True
)
)

@property
def dagster_time_partitions(self) -> List[TablePartitionDimension]:
time_partitions = [
Expand Down Expand Up @@ -244,15 +252,12 @@ def updated(self) -> List[TablePartitionDimension]:
if p.partition_expr == self.updated_time_partition_field
]

def deleted(self) -> List[str]:
return list(
set(self.iceberg_table_partition_field_names.values())
- set(
self.get_dagster_partition_dimension_names(
allow_empty_dagster_partitions=True
)
)
)
def deleted(self) -> List[partitioning.PartitionField]:
return [
p
for p in self.iceberg_partition_spec.fields
if p.name in self.deleted_partition_field_names
]


def _update_table_spec(
Expand Down
2 changes: 1 addition & 1 deletion tests/dagster_pyiceberg/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,4 @@ def test_iceberg_to_dagster_partition_mapper_deleted():
).deleted()

assert len(deleted_partitions) == 2
assert sorted(deleted_partitions) == ["category", "timestamp"]
assert sorted(p.name for p in deleted_partitions) == ["category", "timestamp"]

0 comments on commit 5cc9567

Please sign in to comment.