Skip to content

Commit

Permalink
chore: renames
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Oct 20, 2024
1 parent 2649fc3 commit d96a999
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
12 changes: 6 additions & 6 deletions packages/dagster_pyiceberg/src/dagster_pyiceberg/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def supported_types(self) -> Sequence[Type[object]]:
return (pa.Table, pa.RecordBatchReader)


class IcebergToDagsterPartitionMapper:
class PartitionMapper:

def __init__(
self,
Expand Down Expand Up @@ -264,7 +264,7 @@ def dagster_time_partitions(self) -> List[TablePartitionDimension]:
return time_partitions

@property
def updated_time_partition_field(self) -> str | None:
def updated_dagster_time_partition_field(self) -> str | None:
"""If time partitions present, check whether these have been updated.
This happens when users change e.g. from an hourly to a daily partition."""
# The assumption is that even a multi-partitioned table will have only one time partition
Expand Down Expand Up @@ -300,7 +300,7 @@ def updated(self) -> List[TablePartitionDimension]:
return [
p
for p in self.get_table_slice_partition_dimensions()
if p.partition_expr == self.updated_time_partition_field
if p.partition_expr == self.updated_dagster_time_partition_field
]

def deleted(self) -> List[partitioning.PartitionField]:
Expand All @@ -316,7 +316,7 @@ class IcebergTableSpecUpdater:

def __init__(
self,
partition_mapping: IcebergToDagsterPartitionMapper,
partition_mapping: PartitionMapper,
partition_spec_update_mode: str,
):
self.partition_spec_update_mode = partition_spec_update_mode
Expand Down Expand Up @@ -476,7 +476,7 @@ def _table_writer(
# But this should be a configuration option per table
if partition_dimensions is not None:
IcebergTableSpecUpdater(
partition_mapping=IcebergToDagsterPartitionMapper(
partition_mapping=PartitionMapper(
table_slice=table_slice,
iceberg_table_schema=table.schema(),
iceberg_partition_spec=table.spec(),
Expand All @@ -495,7 +495,7 @@ def _table_writer(
# TODO: add updates and deletes
if partition_dimensions is not None:
IcebergTableSpecUpdater(
partition_mapping=IcebergToDagsterPartitionMapper(
partition_mapping=PartitionMapper(
table_slice=table_slice,
iceberg_table_schema=table.schema(),
iceberg_partition_spec=table.spec(),
Expand Down
14 changes: 7 additions & 7 deletions tests/dagster_pyiceberg/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def test_iceberg_to_dagster_partition_mapper_new_fields(
),
],
)
new_partitions = handler.IcebergToDagsterPartitionMapper(
new_partitions = handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down Expand Up @@ -520,7 +520,7 @@ def test_iceberg_to_dagster_partition_mapper_changed_time_partition(
),
],
)
updated_partitions = handler.IcebergToDagsterPartitionMapper(
updated_partitions = handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand All @@ -545,7 +545,7 @@ def test_iceberg_to_dagster_partition_mapper_deleted(
schema="pytest",
partition_dimensions=[],
)
deleted_partitions = handler.IcebergToDagsterPartitionMapper(
deleted_partitions = handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down Expand Up @@ -577,7 +577,7 @@ def test_iceberg_table_spec_updater_delete_field(
],
)
spec_updater = handler.IcebergTableSpecUpdater(
partition_mapping=handler.IcebergToDagsterPartitionMapper(
partition_mapping=handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down Expand Up @@ -611,7 +611,7 @@ def test_iceberg_table_spec_updater_update_field(
],
)
spec_updater = handler.IcebergTableSpecUpdater(
partition_mapping=handler.IcebergToDagsterPartitionMapper(
partition_mapping=handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down Expand Up @@ -646,7 +646,7 @@ def test_iceberg_table_spec_updater_add_field(
],
)
spec_updater = handler.IcebergTableSpecUpdater(
partition_mapping=handler.IcebergToDagsterPartitionMapper(
partition_mapping=handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down Expand Up @@ -678,7 +678,7 @@ def test_iceberg_table_spec_updater_fails_with_error_update_mode(
],
)
spec_updater = handler.IcebergTableSpecUpdater(
partition_mapping=handler.IcebergToDagsterPartitionMapper(
partition_mapping=handler.PartitionMapper(
iceberg_table_schema=iceberg_table_schema,
iceberg_partition_spec=spec,
table_slice=table_slice,
Expand Down

0 comments on commit d96a999

Please sign in to comment.