From 53549c0d4ba9cab060c063b21d7df6796259b3c7 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 12 Apr 2024 19:53:57 +0000 Subject: [PATCH 1/5] codespaces is weird - checkpoint --- pyiceberg/table/__init__.py | 54 +++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ea813176fc..4738fdfdf4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -137,6 +137,7 @@ ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.datetime import datetime_to_millis +from pyiceberg.utils.singleton import _convert_to_hashable_type if TYPE_CHECKING: import daft @@ -3411,6 +3412,59 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ) + def partitions(self) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + partition_record = self.tbl.metadata.specs_struct() + pa_record_struct = schema_to_pyarrow(partition_record) + + partitions_schema = pa.schema([ + pa.field('partition', pa_record_struct, nullable=False), + pa.field('spec_id', pa.int32(), nullable=False), + pa.field('record_count', pa.int64(), nullable=False), + pa.field('file_count', pa.int32(), nullable=False), + pa.field('total_data_file_size_in_bytes', pa.int64(), nullable=False), + pa.field('position_delete_record_count', pa.int64(), nullable=False), + pa.field('position_delete_file_count', pa.int32(), nullable=False), + pa.field('equality_delete_record_count', pa.int64(), nullable=False), + pa.field('equality_delete_file_count', pa.int32(), nullable=False), + pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=False), + pa.field('last_updated_snapshot_id', pa.int64(), nullable=False), + ]) + + def update_partitions_map(partitions_map: Tuple[str, Any], file: DataFile, partition_record_dict: Dict[str, Any]) -> None: + partition_record_key = _convert_to_hashable_type(partition_record_dict) + if partition_row := partitions_map.get(partition_record_key): + if file.content == DataFileContent.DATA: + print() + elif file.content == DataFileContent.POSITION_DELETES: + print() + elif file.content == DataFileContent.EQUALITY_DELETES: + print() + else: + raise ValueError(f"Unknown DataFileContent ({file.content})") + + + + partitions_map = dict() + if snapshot := self.tbl.metadata.current_snapshot(): + for manifest in snapshot.manifests(self.tbl.io): + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + update_partitions_map(partitions_map, entry.data_file, partition_record_dict) + + return pa.Table.from_pylist( + partitions_map.values(), + schema=partitions_schema, + ) + + @dataclass(frozen=True) class TablePartition: partition_key: PartitionKey From 70308c9991c9f3420ea383f0960f8d35c5168495 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 12 Apr 2024 21:23:52 +0000 Subject: [PATCH 2/5] partitions table --- pyiceberg/table/__init__.py | 62 +++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4738fdfdf4..43e778f823 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3411,7 +3411,6 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: schema=entries_schema, ) - def partitions(self) -> "pa.Table": import pyarrow as pa @@ -3419,7 +3418,7 @@ def partitions(self) -> "pa.Table": partition_record = self.tbl.metadata.specs_struct() pa_record_struct = schema_to_pyarrow(partition_record) - + partitions_schema = pa.schema([ pa.field('partition', pa_record_struct, nullable=False), pa.field('spec_id', pa.int32(), nullable=False), @@ -3430,25 +3429,53 @@ def partitions(self) -> "pa.Table": pa.field('position_delete_file_count', pa.int32(), nullable=False), pa.field('equality_delete_record_count', pa.int64(), nullable=False), pa.field('equality_delete_file_count', pa.int32(), nullable=False), - pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=False), - pa.field('last_updated_snapshot_id', pa.int64(), nullable=False), + pa.field('last_updated_at', pa.timestamp(unit='ms'), nullable=True), + pa.field('last_updated_snapshot_id', pa.int64(), nullable=True), ]) - def update_partitions_map(partitions_map: Tuple[str, Any], file: DataFile, partition_record_dict: Dict[str, Any]) -> None: + def update_partitions_map( + partitions_map: Dict[Tuple[str, Any], Any], + file: DataFile, + partition_record_dict: Dict[str, Any], + snapshot: Optional[Snapshot], + ) -> None: partition_record_key = _convert_to_hashable_type(partition_record_dict) - if partition_row := partitions_map.get(partition_record_key): - if file.content == DataFileContent.DATA: - print() - elif file.content == DataFileContent.POSITION_DELETES: - print() - elif file.content == DataFileContent.EQUALITY_DELETES: - print() - else: - raise ValueError(f"Unknown DataFileContent ({file.content})") - + if partition_record_key not in partitions_map: + partitions_map[partition_record_key] = { + "partition": partition_record_dict, + "spec_id": file.spec_id, + "record_count": 0, + "file_count": 0, + "total_data_file_size_in_bytes": 0, + "position_delete_record_count": 0, + "position_delete_file_count": 0, + "equality_delete_record_count": 0, + "equality_delete_file_count": 0, + "last_updated_at": snapshot.timestamp_ms if snapshot else None, + "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None, + } + + partition_row = partitions_map[partition_record_key] + if snapshot is not None: + if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms: + partition_row["last_updated_at"] = snapshot.timestamp_ms + partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id + + if file.content == DataFileContent.DATA: + partition_row["record_count"] += file.record_count + partition_row["file_count"] += 1 + partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes + elif file.content == DataFileContent.POSITION_DELETES: + partition_row["position_delete_record_count"] += file.record_count + partition_row["position_delete_file_count"] += 1 + elif file.content == DataFileContent.EQUALITY_DELETES: + partition_row["equality_delete_record_count"] += file.record_count + partition_row["equality_delete_file_count"] += 1 + else: + raise ValueError(f"Unknown DataFileContent ({file.content})") - partitions_map = dict() + partitions_map: Dict[Tuple[str, Any], Any] = {} if snapshot := self.tbl.metadata.current_snapshot(): for manifest in snapshot.manifests(self.tbl.io): for entry in manifest.fetch_manifest_entry(io=self.tbl.io): @@ -3457,7 +3484,8 @@ def update_partitions_map(partitions_map: Tuple[str, Any], file: DataFile, parti field.name: partition[pos] for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) } - update_partitions_map(partitions_map, entry.data_file, partition_record_dict) + entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None + update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) return pa.Table.from_pylist( partitions_map.values(), From 6f4f0f33d7d6ebd20944717ab915108fa5978c8c Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 13 Apr 2024 18:36:28 +0000 Subject: [PATCH 3/5] support partitions metadata table --- pyiceberg/table/__init__.py | 21 ++-- tests/integration/test_inspect_table.py | 121 +++++++++++++++++++++++- 2 files changed, 132 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 43e778f823..301e67e890 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3416,12 +3416,7 @@ def partitions(self) -> "pa.Table": from pyiceberg.io.pyarrow import schema_to_pyarrow - partition_record = self.tbl.metadata.specs_struct() - pa_record_struct = schema_to_pyarrow(partition_record) - - partitions_schema = pa.schema([ - pa.field('partition', pa_record_struct, nullable=False), - pa.field('spec_id', pa.int32(), nullable=False), + table_schema = pa.schema([ pa.field('record_count', pa.int64(), nullable=False), pa.field('file_count', pa.int32(), nullable=False), pa.field('total_data_file_size_in_bytes', pa.int64(), nullable=False), @@ -3433,6 +3428,18 @@ def partitions(self) -> "pa.Table": pa.field('last_updated_snapshot_id', pa.int64(), nullable=True), ]) + partition_record = self.tbl.metadata.specs_struct() + has_partitions = len(partition_record.fields) > 0 + + if has_partitions: + pa_record_struct = schema_to_pyarrow(partition_record) + partitions_schema = pa.schema([ + pa.field('partition', pa_record_struct, nullable=False), + pa.field('spec_id', pa.int32(), nullable=False), + ]) + + table_schema = pa.unify_schemas([partitions_schema, table_schema]) + def update_partitions_map( partitions_map: Dict[Tuple[str, Any], Any], file: DataFile, @@ -3489,7 +3496,7 @@ def update_partitions_map( return pa.Table.from_pylist( partitions_map.values(), - schema=partitions_schema, + schema=table_schema, ) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index f2515caee8..9b4c64e375 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -164,9 +164,6 @@ def test_inspect_entries( for value in df[int_column]: assert isinstance(value.as_py(), int) - for snapshot_id in df['snapshot_id']: - assert isinstance(snapshot_id.as_py(), int) - lhs = df.to_pandas() rhs = spark.table(f"{identifier}.entries").toPandas() for column in df.column_names: @@ -266,3 +263,121 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': date(2021, 2, 1), 'dt_month': None} assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_partitions_unpartitioned( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_partitions_unpartitioned" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Write some data through multiple commits + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + df = tbl.inspect.partitions() + assert df.column_names == [ + 'record_count', + 'file_count', + 'total_data_file_size_in_bytes', + 'position_delete_record_count', + 'position_delete_file_count', + 'equality_delete_record_count', + 'equality_delete_file_count', + 'last_updated_at', + 'last_updated_snapshot_id', + ] + for last_updated_at in df['last_updated_at']: + assert isinstance(last_updated_at.as_py(), datetime) + + int_cols = [ + 'record_count', + 'file_count', + 'total_data_file_size_in_bytes', + 'position_delete_record_count', + 'position_delete_file_count', + 'equality_delete_record_count', + 'equality_delete_file_count', + 'last_updated_snapshot_id', + ] + for column in int_cols: + for value in df[column]: + assert isinstance(value.as_py(), int) + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.partitions").toPandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_partitions_partitioned" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + name string, + dt date + ) + PARTITIONED BY (months(dt)) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date)) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date)) + """ + ) + + spark.sql( + f""" + ALTER TABLE {identifier} + REPLACE PARTITION FIELD dt_month WITH days(dt) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date)) + """ + ) + + spark.sql( + f""" + ALTER TABLE {identifier} + DROP PARTITION FIELD dt_day + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date)) + """ + ) + + df = session_catalog.load_table(identifier).inspect.partitions() + + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.partitions").toPandas() + + lhs.sort_values('spec_id', inplace=True) + rhs.sort_values('spec_id', inplace=True) + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == "partition": + right = right.asDict() + assert left == right, f"Difference in column {column}: {left} != {right}" From d5e1695298d65f0b8a848e8b94aa651813039204 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 13 Apr 2024 19:03:46 +0000 Subject: [PATCH 4/5] docs --- mkdocs/docs/api.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 15931d02fb..43d3871a52 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -370,6 +370,47 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap- summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]] ``` +### Partitions + +Inspect the partitions of the table: + +```python +table.inspect.partitions() +``` + +``` +pyarrow.Table +partition: struct not null + child 0, dt_month: int32 + child 1, dt_day: date32[day] +spec_id: int32 not null +record_count: int64 not null +file_count: int32 not null +total_data_file_size_in_bytes: int64 not null +position_delete_record_count: int64 not null +position_delete_file_count: int32 not null +equality_delete_record_count: int64 not null +equality_delete_file_count: int32 not null +last_updated_at: timestamp[ms] +last_updated_snapshot_id: int64 +---- +partition: [ + -- is_valid: all not null + -- child 0 type: int32 +[null,null,612] + -- child 1 type: date32[day] +[null,2021-02-01,null]] +spec_id: [[2,1,0]] +record_count: [[1,1,2]] +file_count: [[1,1,2]] +total_data_file_size_in_bytes: [[641,641,1260]] +position_delete_record_count: [[0,0,0]] +position_delete_file_count: [[0,0,0]] +equality_delete_record_count: [[0,0,0]] +equality_delete_file_count: [[0,0,0]] +last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]] +``` + ### Entries To show all the table's current manifest entries for both data and delete files. From 9a26b7b0d0030819c2e996b6b306f2c211eecaeb Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 13 Apr 2024 19:25:34 +0000 Subject: [PATCH 5/5] support time travel --- pyiceberg/table/__init__.py | 22 +++++++++++----------- tests/integration/test_inspect_table.py | 24 +++++++++++++----------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7c26197bb3..95fdb1d288 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3423,7 +3423,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: schema=entries_schema, ) - def partitions(self) -> "pa.Table": + def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -3495,16 +3495,16 @@ def update_partitions_map( raise ValueError(f"Unknown DataFileContent ({file.content})") partitions_map: Dict[Tuple[str, Any], Any] = {} - if snapshot := self.tbl.metadata.current_snapshot(): - for manifest in snapshot.manifests(self.tbl.io): - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } - entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None - update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) + snapshot = self._get_snapshot(snapshot_id) + for manifest in snapshot.manifests(self.tbl.io): + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None + update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) return pa.Table.from_pylist( partitions_map.values(), diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index a750fdf129..d9ec563466 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -376,15 +376,17 @@ def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: Ca """ ) - df = session_catalog.load_table(identifier).inspect.partitions() - - lhs = df.to_pandas() - rhs = spark.table(f"{identifier}.partitions").toPandas() + def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None: + lhs = df.to_pandas().sort_values('spec_id') + rhs = spark_df.toPandas().sort_values('spec_id') + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == "partition": + right = right.asDict() + assert left == right, f"Difference in column {column}: {left} != {right}" - lhs.sort_values('spec_id', inplace=True) - rhs.sort_values('spec_id', inplace=True) - for column in df.column_names: - for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): - if column == "partition": - right = right.asDict() - assert left == right, f"Difference in column {column}: {left} != {right}" + tbl = session_catalog.load_table(identifier) + for snapshot in tbl.metadata.snapshots: + df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) + spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") + check_pyiceberg_df_equals_spark_df(df, spark_df)