Skip to content

Commit aa27a48

Browse files
committed
cache manifests
1 parent 65a03d2 commit aa27a48

File tree

7 files changed

+26
-18
lines changed

7 files changed

+26
-18
lines changed

pyiceberg/catalog/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
717717
manifest_lists_to_delete = set()
718718
manifests_to_delete: List[ManifestFile] = []
719719
for snapshot in metadata.snapshots:
720-
manifests_to_delete += snapshot.manifests(io)
720+
manifests_to_delete += snapshot.manifests(io, snapshot.manifest_list)
721721
if snapshot.manifest_list is not None:
722722
manifest_lists_to_delete.add(snapshot.manifest_list)
723723

pyiceberg/cli/output.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def files(self, table: Table, history: bool) -> None:
144144
manifest_list_str = f": {snapshot.manifest_list}" if snapshot.manifest_list else ""
145145
list_tree = snapshot_tree.add(f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}{manifest_list_str}")
146146

147-
manifest_list = snapshot.manifests(io)
147+
manifest_list = snapshot.manifests(io, manifest_list_str)
148148
for manifest in manifest_list:
149149
manifest_tree = list_tree.add(f"Manifest: {manifest.manifest_path}")
150150
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=False):

pyiceberg/table/__init__.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -1696,7 +1696,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
16961696

16971697
manifests = [
16981698
manifest_file
1699-
for manifest_file in snapshot.manifests(self.io)
1699+
for manifest_file in snapshot.manifests(self.io, snapshot.manifest_list)
17001700
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
17011701
]
17021702

@@ -2929,7 +2929,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
29292929
if previous_snapshot is None:
29302930
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")
29312931

2932-
for manifest in previous_snapshot.manifests(io=self._io):
2932+
for manifest in previous_snapshot.manifests(io=self._io, manifest_list=previous_snapshot.manifest_list):
29332933
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
29342934
existing_manifests.append(manifest)
29352935

@@ -2980,7 +2980,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
29802980
if entry.data_file.content == DataFileContent.DATA
29812981
]
29822982

2983-
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
2983+
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io, previous_snapshot.manifest_list))
29842984
return list(chain(*list_of_entries))
29852985
else:
29862986
return []
@@ -3372,7 +3372,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
33723372

33733373
entries = []
33743374
snapshot = self._get_snapshot(snapshot_id)
3375-
for manifest in snapshot.manifests(self.tbl.io):
3375+
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
33763376
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
33773377
column_sizes = entry.data_file.column_sizes or {}
33783378
value_counts = entry.data_file.value_counts or {}
@@ -3534,7 +3534,7 @@ def update_partitions_map(
35343534

35353535
partitions_map: Dict[Tuple[str, Any], Any] = {}
35363536
snapshot = self._get_snapshot(snapshot_id)
3537-
for manifest in snapshot.manifests(self.tbl.io):
3537+
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
35383538
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
35393539
partition = entry.data_file.partition
35403540
partition_record_dict = {
@@ -3612,7 +3612,7 @@ def _partition_summaries_to_rows(
36123612
specs = self.tbl.metadata.specs()
36133613
manifests = []
36143614
if snapshot := self.tbl.metadata.current_snapshot():
3615-
for manifest in snapshot.manifests(self.tbl.io):
3615+
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
36163616
is_data_file = manifest.content == ManifestContent.DATA
36173617
is_delete_file = manifest.content == ManifestContent.DELETES
36183618
manifests.append({

pyiceberg/table/snapshots.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import time
1818
from collections import defaultdict
1919
from enum import Enum
20+
from functools import lru_cache
2021
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
2122

2223
from pydantic import Field, PrivateAttr, model_serializer
@@ -242,9 +243,12 @@ def __str__(self) -> str:
242243
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
243244
return result_str
244245

245-
def manifests(self, io: FileIO) -> List[ManifestFile]:
246-
if self.manifest_list is not None:
247-
file = io.new_input(self.manifest_list)
246+
@staticmethod
247+
@lru_cache
248+
def manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
249+
"""Return the manifests for the given snapshot."""
250+
if manifest_list not in (None, ""):
251+
file = io.new_input(manifest_list)
248252
return list(read_manifest_list(file))
249253
return []
250254

tests/integration/test_partitioning_key.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -763,10 +763,14 @@ def test_partition_key(
763763
snapshot = iceberg_table.current_snapshot()
764764
assert snapshot
765765
spark_partition_for_justification = (
766-
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition
766+
snapshot.manifests(iceberg_table.io, snapshot.manifest_list)[0]
767+
.fetch_manifest_entry(iceberg_table.io)[0]
768+
.data_file.partition
767769
)
768770
spark_path_for_justification = (
769-
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path
771+
snapshot.manifests(iceberg_table.io, snapshot.manifest_list)[0]
772+
.fetch_manifest_entry(iceberg_table.io)[0]
773+
.data_file.file_path
770774
)
771775
assert spark_partition_for_justification == expected_partition_record
772776
assert expected_hive_partition_path_slice in spark_path_for_justification

tests/integration/test_rest_manifest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
7575
if test_snapshot is None:
7676
raise ValueError("Table has no current snapshot, check the docker environment")
7777
io = table_test_all_types.io
78-
test_manifest_file = test_snapshot.manifests(io)[0]
78+
test_manifest_file = test_snapshot.manifests(io, test_snapshot.manifest_list)[0]
7979
test_manifest_entries = test_manifest_file.fetch_manifest_entry(io)
8080
entry = test_manifest_entries[0]
8181
test_schema = table_test_all_types.schema()

tests/utils/test_manifest.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:
217217
summary=Summary(Operation.APPEND),
218218
schema_id=3,
219219
)
220-
manifest_list = snapshot.manifests(io)[0]
220+
manifest_list = snapshot.manifests(io, snapshot.manifest_list)[0]
221221

222222
assert manifest_list.manifest_length == 7989
223223
assert manifest_list.partition_spec_id == 0
@@ -267,7 +267,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
267267
summary=Summary(Operation.APPEND),
268268
schema_id=3,
269269
)
270-
manifest_list = snapshot.manifests(io)[0]
270+
manifest_list = snapshot.manifests(io, manifest_list=snapshot.manifest_list)[0]
271271

272272
assert manifest_list.manifest_length == 7989
273273
assert manifest_list.partition_spec_id == 0
@@ -319,7 +319,7 @@ def test_write_manifest(
319319
summary=Summary(Operation.APPEND),
320320
schema_id=3,
321321
)
322-
demo_manifest_file = snapshot.manifests(io)[0]
322+
demo_manifest_file = snapshot.manifests(io, snapshot.manifest_list)[0]
323323
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
324324
test_schema = Schema(
325325
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
@@ -491,7 +491,7 @@ def test_write_manifest_list(
491491
schema_id=3,
492492
)
493493

494-
demo_manifest_list = snapshot.manifests(io)
494+
demo_manifest_list = snapshot.manifests(io, snapshot.manifest_list)
495495
with TemporaryDirectory() as tmp_dir:
496496
path = tmp_dir + "/manifest-list.avro"
497497
output = io.new_output(path)

0 commit comments

Comments
 (0)