From 0b3f1d53369c5013bfd68985c6ac49e88bbdae03 Mon Sep 17 00:00:00 2001 From: Vamsi Date: Wed, 8 Jan 2025 21:59:40 +0530 Subject: [PATCH] Add custom iceberg metadata cleaner --- .../apache/xtable/conversion/TargetTable.java | 3 + .../iceberg/IcebergConversionTarget.java | 45 +++- .../IcebergMetadataCleanupStrategy.java | 94 +++++++ .../iceberg/IcebergMetadataFileCleaner.java | 236 ++++++++++++++++++ 4 files changed, 369 insertions(+), 9 deletions(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataCleanupStrategy.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataFileCleaner.java diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 6256da2c6..54689453f 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -30,6 +30,7 @@ @EqualsAndHashCode(callSuper = true) public class TargetTable extends ExternalTable { private final Duration metadataRetention; + private final boolean useInternalMetadataCleaner; @Builder(toBuilder = true) public TargetTable( @@ -39,9 +40,11 @@ public TargetTable( String[] namespace, CatalogConfig catalogConfig, Duration metadataRetention, + boolean useInternalMetadataCleaner, Properties additionalProperties) { super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; + this.useInternalMetadataCleaner = useInternalMetadataCleaner; } } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index ecdbfa261..1fd3d1aaf 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -15,19 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.xtable.iceberg; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -39,6 +42,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.util.ThreadPools; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; @@ -51,6 +55,10 @@ @Log4j2 public class IcebergConversionTarget implements ConversionTarget { + + private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = + MoreExecutors.newDirectExecutorService(); + private static final String METADATA_DIR_PATH = "/metadata/"; private IcebergSchemaExtractor schemaExtractor; private IcebergSchemaSync schemaSync; @@ -66,8 +74,10 @@ public class IcebergConversionTarget implements ConversionTarget { private Transaction transaction; private Table table; private InternalTable internalTableState; + private boolean useInternalMetadataCleaner; - public IcebergConversionTarget() {} + private final ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE; + private final ExecutorService planExecutorService = ThreadPools.getWorkerPool(); IcebergConversionTarget( TargetTable targetTable, @@ -107,6 +117,7 @@ private void _init( this.basePath = targetTable.getBasePath(); this.configuration = configuration; this.snapshotRetentionInHours = (int) targetTable.getMetadataRetention().toHours(); + this.useInternalMetadataCleaner = targetTable.isUseInternalMetadataCleaner(); String[] namespace = targetTable.getNamespace(); this.tableIdentifier = namespace == null @@ -211,18 +222,34 @@ public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { @Override public void completeSync() { - transaction - .expireSnapshots() - .expireOlderThan( - Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli()) - .deleteWith(this::safeDelete) // ensures that only metadata files are deleted - .cleanExpiredFiles(true) - .commit(); + boolean useInternalIcebergCleaner = useInternalCleaner(); + ExpireSnapshots expireSnapshots = + transaction + .expireSnapshots() + .expireOlderThan( + Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli()) + .cleanExpiredFiles(!useInternalIcebergCleaner); // is internal cleaner is enabled, disable iceberg cleaner + List removedSnapshots = expireSnapshots.apply(); + expireSnapshots.commit(); transaction.commitTransaction(); + // after commit is complete, clean up the manifest files + if (useInternalIcebergCleaner) { + cleanExpiredSnapshots(removedSnapshots); + } transaction = null; internalTableState = null; } + private boolean useInternalCleaner() { + return useInternalMetadataCleaner && table.refs().size() == 1; + } + + private void cleanExpiredSnapshots(List removedSnapshots) { + IcebergMetadataCleanupStrategy cleanupStrategy = new IcebergMetadataFileCleaner(transaction.table().io(), deleteExecutorService, planExecutorService, this::safeDelete); + cleanupStrategy.cleanFiles(table, removedSnapshots); + } + + private void safeDelete(String file) { if (file.startsWith(new Path(basePath) + METADATA_DIR_PATH)) { table.io().deleteFile(file); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataCleanupStrategy.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataCleanupStrategy.java new file mode 100644 index 000000000..063f4c3ee --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataCleanupStrategy.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import lombok.extern.log4j.Log4j2; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +@Log4j2 +abstract class IcebergMetadataCleanupStrategy { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataCleanupStrategy.class); + + protected final FileIO fileIO; + protected final ExecutorService planExecutorService; + private final Consumer deleteFunc; + private final ExecutorService deleteExecutorService; + + protected IcebergMetadataCleanupStrategy( + FileIO fileIO, + ExecutorService deleteExecutorService, + ExecutorService planExecutorService, + Consumer deleteFunc) { + this.fileIO = fileIO; + this.deleteExecutorService = deleteExecutorService; + this.planExecutorService = planExecutorService; + this.deleteFunc = deleteFunc; + } + + public abstract void cleanFiles(Table table, List removedSnapshots); + + private static final Schema MANIFEST_PROJECTION = + ManifestFile.schema() + .select( + "manifest_path", + "manifest_length", + "partition_spec_id", + "added_snapshot_id", + "deleted_data_files_count"); + + protected CloseableIterable readManifests(Snapshot snapshot) { + if (snapshot.manifestListLocation() != null) { + return Avro.read(fileIO.newInputFile(snapshot.manifestListLocation())) + .rename("manifest_file", GenericManifestFile.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(MANIFEST_PROJECTION) + .reuseContainers(true) + .build(); + } else { + return CloseableIterable.withNoopClose(snapshot.allManifests(fileIO)); + } + } + + protected void deleteFiles(Set pathsToDelete, String fileType) { + Tasks.foreach(pathsToDelete) + .executeWith(deleteExecutorService) + .retry(3) + .stopRetryOn(NotFoundException.class) + .suppressFailureWhenFinished() + .onFailure( + (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown)) + .run(deleteFunc::accept); + } +} \ No newline at end of file diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataFileCleaner.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataFileCleaner.java new file mode 100644 index 000000000..0a501251b --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataFileCleaner.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import lombok.extern.log4j.Log4j2; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Log4j2 +public class IcebergMetadataFileCleaner extends IcebergMetadataCleanupStrategy { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataFileCleaner.class); + + IcebergMetadataFileCleaner( + FileIO fileIO, + ExecutorService deleteExecutorService, + ExecutorService planExecutorService, + Consumer deleteFunc) { + super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); + } + + @Override + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) + public void cleanFiles(Table table, List removedSnapshots) { + if (table.refs().size() > 1) { + throw new UnsupportedOperationException( + "Cannot incrementally clean files for tables with more than 1 ref"); + } + + table.refresh(); + // clean up the expired snapshots: + // 1. Get a list of the snapshots that were removed + // 2. Delete any data files that were deleted by those snapshots and are not in the table + // 3. Delete any manifests that are no longer used by current snapshots + // 4. Delete the manifest lists + + Set validIds = Sets.newHashSet(); + for (Snapshot snapshot : table.snapshots()) { + validIds.add(snapshot.snapshotId()); + } + + Set expiredIds = Sets.newHashSet(); + for (Snapshot snapshot : removedSnapshots) { + long snapshotId = snapshot.snapshotId(); + expiredIds.add(snapshotId); + } + + if (expiredIds.isEmpty()) { + // if no snapshots were expired, skip cleanup + return; + } + + SnapshotRef branchToCleanup = Iterables.getFirst(table.refs().values(), null); + if (branchToCleanup == null) { + return; + } + + Snapshot latest = table.snapshot(branchToCleanup.snapshotId()); + Iterable snapshots = table.snapshots(); + + // this is the set of ancestors of the current table state. when removing snapshots, this must + // only remove files that were deleted in an ancestor of the current table state to avoid + // physically deleting files that were logically deleted in a commit that was rolled back. + Set ancestorIds = + Sets.newHashSet(SnapshotUtil.ancestorIds(latest, snapshotLookup(removedSnapshots, table))); + + Set pickedAncestorSnapshotIds = Sets.newHashSet(); + for (long snapshotId : ancestorIds) { + String sourceSnapshotId = + table + .snapshot(snapshotId) + .summary() + .get(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP); + if (sourceSnapshotId != null) { + // protect any snapshot that was cherry-picked into the current table state + pickedAncestorSnapshotIds.add(Long.parseLong(sourceSnapshotId)); + } + } + + // find manifests to clean up that are still referenced by a valid snapshot, but written by an + // expired snapshot + Set validManifests = Sets.newHashSet(); + + // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete + // as much of the delete work as possible and avoid orphaned data or manifest files. + Tasks.foreach(snapshots) + .retry(3) + .suppressFailureWhenFinished() + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed on snapshot {} while reading manifest list: {}", + snapshot.snapshotId(), + snapshot.manifestListLocation(), + exc)) + .run( + snapshot -> { + try (CloseableIterable manifests = readManifests(snapshot)) { + for (ManifestFile manifest : manifests) { + validManifests.add(manifest.path()); + } + + } catch (IOException e) { + throw new RuntimeException( + "Failed to close manifest list: " + snapshot.manifestListLocation(), e); + } + }); + + // find manifests to clean up that were only referenced by snapshots that have expired + Set manifestListsToDelete = Sets.newHashSet(); + Set manifestsToDelete = Sets.newHashSet(); + Tasks.foreach(removedSnapshots) + .retry(3) + .suppressFailureWhenFinished() + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed on snapshot {} while reading manifest list: {}", + snapshot.snapshotId(), + snapshot.manifestListLocation(), + exc)) + .run( + snapshot -> { + long snapshotId = snapshot.snapshotId(); + if (!validIds.contains(snapshotId)) { + // determine whether the changes in this snapshot are in the current table state + if (pickedAncestorSnapshotIds.contains(snapshotId)) { + // this snapshot was cherry-picked into the current table state, so skip cleaning + // it up. + // its changes will expire when the picked snapshot expires. + // A -- C -- D (source=B) + // `- B <-- this commit + return; + } + + long sourceSnapshotId = + PropertyUtil.propertyAsLong( + snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1); + if (ancestorIds.contains(sourceSnapshotId)) { + // this commit was cherry-picked from a commit that is in the current table state. + // do not clean up its changes because it would revert data file additions that + // are in the current + // table. + // A -- B -- C + // `- D (source=B) <-- this commit + return; + } + + if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) { + // this commit was cherry-picked from a commit that is in the current table state. + // do not clean up its changes because it would revert data file additions that + // are in the current + // table. + // A -- C -- E (source=B) + // `- B `- D (source=B) <-- this commit + return; + } + + // find any manifests that are no longer needed + try (CloseableIterable manifests = readManifests(snapshot)) { + for (ManifestFile manifest : manifests) { + if (!validManifests.contains(manifest.path())) { + manifestsToDelete.add(manifest.path()); + } + } + } catch (IOException e) { + throw new RuntimeException( + "Failed to close manifest list: " + snapshot.manifestListLocation(), e); + } + + // add the manifest list to the delete set, if present + if (snapshot.manifestListLocation() != null) { + manifestListsToDelete.add(snapshot.manifestListLocation()); + } + } + }); + + deleteFiles(manifestsToDelete, "manifest"); + deleteFiles(manifestListsToDelete, "manifest list"); + + if (!table.statisticsFiles().isEmpty()) { + Set expiredStatisticsFiles = new HashSet<>(); + for (StatisticsFile statisticsFile : table.statisticsFiles()) { + if (expiredIds.contains(statisticsFile.snapshotId())) { + expiredStatisticsFiles.add(statisticsFile.path()); + } + } + deleteFiles(expiredStatisticsFiles, "statistics files"); + } + } + + private static Function snapshotLookup(List removedSnapshots, Table table) { + Map snapshotMap = new HashMap<>(); + removedSnapshots.forEach(snapshot -> snapshotMap.put(snapshot.snapshotId(), snapshot)); + table.snapshots().forEach(snapshot -> snapshotMap.put(snapshot.snapshotId(), snapshot)); + return snapshotMap::get; + } +}