From 7da279d54f018b549a1e9bffe3cae677a68d147e Mon Sep 17 00:00:00 2001 From: Will-Lo Date: Mon, 23 Dec 2024 18:31:48 -0500 Subject: [PATCH] Adds support in snapshot expiration job to use history policy configurations --- .../tasks/TableSnapshotsExpirationTask.java | 4 +- .../openhouse/jobs/spark/Operations.java | 53 ++++++----- .../spark/SnapshotsExpirationSparkApp.java | 45 +++++---- .../openhouse/jobs/util/HistoryConfig.java | 6 +- .../openhouse/jobs/spark/OperationsTest.java | 95 ++++++++++++++++--- 5 files changed, 143 insertions(+), 60 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java index 8a32eb87..a543a113 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java @@ -38,8 +38,8 @@ protected List getArgs() { "--maxAge", Integer.toString(config.getMaxAge()), "--granularity", config.getGranularity().getValue())); } - if (config.getMinVersions() > 0) { - jobArgs.addAll(Arrays.asList("--minVersions", Integer.toString(config.getMinVersions()))); + if (config.getVersions() > 0) { + jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions()))); } return jobArgs; } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 7a0ad471..c13b3eed 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -1,6 +1,5 @@ package com.linkedin.openhouse.jobs.spark; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import com.linkedin.openhouse.jobs.util.SparkJobUtil; @@ -21,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -202,34 +202,35 @@ public void deleteStagedOrphanDirectory( } /** Expire snapshots on a given fully-qualified table name. */ - public void expireSnapshots(String fqtn, long expireBeforeTimestampMs, int minVersions) { - expireSnapshots(getTable(fqtn), expireBeforeTimestampMs, minVersions); + public void expireSnapshots(String fqtn, int maxAge, String granularity, int versions) { + expireSnapshots(getTable(fqtn), maxAge, granularity, versions); } - /** Expire snapshots on a given {@link Table}. */ - public void expireSnapshots(Table table, long expireBeforeTimestampMs, int minVersions) { - // min versions to keep not defined, expire based on timestamp of snapshots - if (minVersions == 0) { - log.info( - "Expiring snapshots for table: {} before timestamp: {}", table, expireBeforeTimestampMs); - table - .expireSnapshots() - .cleanExpiredFiles(false) - .expireOlderThan(expireBeforeTimestampMs) - .commit(); - } else if (Iterators.size(table.snapshots().iterator()) > minVersions) { - log.info("Expiring snapshots for table: {} beyond last {} versions", table, minVersions); - table - .expireSnapshots() - .cleanExpiredFiles(false) - .expireOlderThan(expireBeforeTimestampMs) - .retainLast(minVersions) + /** + * Expire snapshots on a given {@link Table}. If maxAge is provided, it will expire snapshots + * older than maxAge milliseconds. If versions is provided, it will retain the last versions + * snapshots. If both are provided, it will prioritize maxAge - only retain up to versions number + * of snapshots younger than the maxAge + */ + public void expireSnapshots(Table table, int maxAge, String granularity, int versions) { + ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false); + + // maxAge is always defined with granularity + if (!granularity.isEmpty()) { + TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase()); + long expireBeforeTimestampMs = + System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge); + log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs); + expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit(); + } + if (versions > 0) { + log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions); + // Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan + // currentTime + expireSnapshotsCommand + .expireOlderThan(System.currentTimeMillis()) + .retainLast(versions) .commit(); - } else { - log.warn( - "Table {} has less than {} minimum configured versions, skipping snapshot expiration", - table, - minVersions); } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java index 882756b7..b858db8d 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java @@ -3,7 +3,6 @@ import com.linkedin.openhouse.jobs.spark.state.StateManager; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -19,14 +18,18 @@ public class SnapshotsExpirationSparkApp extends BaseTableSparkApp { private final String granularity; private final int maxAge; - private final int minVersions; + private final int versions; - private static final String DEFAULT_MAX_AGE = "3"; + public static class DEFAULT_CONFIGURATION { + public static final String MAX_AGE = "3"; + public static final String GRANULARITY = "DAYS"; + public static final String VERSIONS = "0"; + } - private static final String DEFAULT_GRANULARITY = "days"; + private static final String DEFAULT_GRANULARITY = ""; - // Default do not define min versions, only retain snapshots based on max age - private static final String DEFAULT_MIN_VERSIONS = "0"; + // By default do not define versions, and only retain snapshots based on max age + private static final String DEFAULT_VERSIONS = "0"; public SnapshotsExpirationSparkApp( String jobId, @@ -34,11 +37,17 @@ public SnapshotsExpirationSparkApp( String fqtn, int maxAge, String granularity, - int minVersions) { + int versions) { super(jobId, stateManager, fqtn); - this.granularity = granularity; - this.maxAge = maxAge; - this.minVersions = minVersions; + if (maxAge == 0 && versions == 0) { + this.maxAge = Integer.parseInt(DEFAULT_CONFIGURATION.MAX_AGE); + this.granularity = DEFAULT_CONFIGURATION.GRANULARITY; + this.versions = Integer.parseInt(DEFAULT_CONFIGURATION.VERSIONS); + } else { + this.granularity = granularity; + this.maxAge = maxAge; + this.versions = versions; + } } @Override @@ -48,10 +57,8 @@ protected void runInner(Operations ops) { fqtn, maxAge, granularity, - minVersions); - long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxAge); - log.info("Expire snapshots before timestamp ms {}", expireBeforeTimestampMs); - ops.expireSnapshots(fqtn, expireBeforeTimestampMs, minVersions); + versions); + ops.expireSnapshots(fqtn, maxAge, granularity, versions); } public static void main(String[] args) { @@ -60,16 +67,18 @@ public static void main(String[] args) { extraOptions.add( new Option("a", "maxAge", true, "Delete snapshots older than s")); extraOptions.add(new Option("g", "granularity", true, "Granularity: day")); - extraOptions.add(new Option("v", "minVersions", true, "Minimum number of versions to keep")); + extraOptions.add( + new Option("v", "versions", true, "Number of versions to keep after snapshot expiration")); CommandLine cmdLine = createCommandLine(args, extraOptions); + SnapshotsExpirationSparkApp app = new SnapshotsExpirationSparkApp( getJobId(cmdLine), createStateManager(cmdLine), cmdLine.getOptionValue("tableName"), - Integer.parseInt(cmdLine.getOptionValue("maxAge", DEFAULT_MAX_AGE)), - cmdLine.getOptionValue("granularity", DEFAULT_GRANULARITY), - Integer.parseInt(cmdLine.getOptionValue("minVersions", DEFAULT_MIN_VERSIONS))); + Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")), + cmdLine.getOptionValue("granularity", ""), + Integer.parseInt(cmdLine.getOptionValue("minVersions", "0"))); app.run(); } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/HistoryConfig.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/HistoryConfig.java index fe649fc2..aa27816f 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/HistoryConfig.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/HistoryConfig.java @@ -1,6 +1,6 @@ package com.linkedin.openhouse.jobs.util; -import com.linkedin.openhouse.tables.client.model.Retention; +import com.linkedin.openhouse.tables.client.model.History; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -13,6 +13,6 @@ @ToString public class HistoryConfig { private final int maxAge; - private final int minVersions; - private final Retention.GranularityEnum granularity; + private final int versions; + private final History.GranularityEnum granularity; } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 164d2639..64857989 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -29,8 +29,6 @@ import org.apache.iceberg.types.Types; import org.apache.spark.sql.Row; import org.assertj.core.util.Lists; -import org.joda.time.DurationFieldType; -import org.joda.time.Hours; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -275,8 +273,44 @@ public void testOrphanFilesDeletionNoStaging() throws Exception { @Test public void testSnapshotsExpirationMaxAge() throws Exception { + final String tableName = "db.test_es_maxage_noop_java"; + final int numInserts = 3; + final int maxAge = 0; + // Not a realistic time setting that is accepted by the SQL, but tests that other snapshots are + // deleted + final String timeGranularity = "DAYS"; + + List snapshotIds; + try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { + prepareTable(ops, tableName); + populateTable(ops, tableName, numInserts); + snapshotIds = getSnapshotIds(ops, tableName); + Assertions.assertEquals( + numInserts, + snapshotIds.size(), + String.format("There must be %d snapshot(s) after inserts", numInserts)); + Table table = ops.getTable(tableName); + log.info("Loaded table {}, location {}", table.name(), table.location()); + + ops.expireSnapshots(table, maxAge, timeGranularity, 0); + // No snapshots should be cleaned up as they are all within the max age + checkSnapshots(table, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + // restart the app to reload catalog cache + try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { + // verify that new apps see snapshots correctly + checkSnapshots( + ops, tableName, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size())); + } + } + + @Test + public void testSnapshotsExpirationMaxAgeNoop() throws Exception { final String tableName = "db.test_es_maxage_java"; final int numInserts = 3; + final int maxAge = 3; + final String timeGranularity = "DAYS"; + List snapshotIds; try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { prepareTable(ops, tableName); @@ -288,21 +322,57 @@ public void testSnapshotsExpirationMaxAge() throws Exception { String.format("There must be %d snapshot(s) after inserts", numInserts)); Table table = ops.getTable(tableName); log.info("Loaded table {}, location {}", table.name(), table.location()); - ops.expireSnapshots(table, System.currentTimeMillis(), 3); + + ops.expireSnapshots(table, maxAge, timeGranularity, 0); + // No snapshots should be cleaned up as they are all within the max age + checkSnapshots(table, snapshotIds); + } + // restart the app to reload catalog cache + try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { + // verify that new apps see snapshots correctly + checkSnapshots(ops, tableName, snapshotIds); + } + } + + @Test + public void testSnapshotsExpirationVersions() throws Exception { + final String tableName = "db.test_es_versions_java"; + final int numInserts = 3; + final int versionsToKeep = 2; + List snapshotIds; + try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { + prepareTable(ops, tableName); + populateTable(ops, tableName, numInserts); + snapshotIds = getSnapshotIds(ops, tableName); + Assertions.assertEquals( + numInserts, + snapshotIds.size(), + String.format("There must be %d snapshot(s) after inserts", numInserts)); + Table table = ops.getTable(tableName); + log.info("Loaded table {}, location {}", table.name(), table.location()); + + ops.expireSnapshots(table, 0, "", versionsToKeep); // verify that table object snapshots are updated - checkSnapshots(table, snapshotIds.subList(2, snapshotIds.size())); + checkSnapshots( + table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size())); } // restart the app to reload catalog cache try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { // verify that new apps see snapshots correctly - checkSnapshots(ops, tableName, snapshotIds.subList(2, snapshotIds.size())); + checkSnapshots( + ops, + tableName, + snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size())); } } @Test - public void testSnapshotsExpirationMinVersions() throws Exception { - final String tableName = "db.test_es_minversions_java"; + public void testSnapshotsExpirationBothAgeAndVersions() throws Exception { + final String tableName = "db.test_es_age_and_versions_java"; final int numInserts = 3; + final int maxAge = 3; + final String timeGranularity = "DAYS"; + final int versionsToKeep = 1; List snapshotIds; try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { prepareTable(ops, tableName); @@ -315,15 +385,18 @@ public void testSnapshotsExpirationMinVersions() throws Exception { Table table = ops.getTable(tableName); log.info("Loaded table {}, location {}", table.name(), table.location()); - ops.expireSnapshots( - table, System.currentTimeMillis() + Hours.ONE.get(DurationFieldType.millis()), 1); + ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep); // verify that table object snapshots are updated - checkSnapshots(table, snapshotIds.subList(2, snapshotIds.size())); + checkSnapshots( + table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size())); } // restart the app to reload catalog cache try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { // verify that new apps see snapshots correctly - checkSnapshots(ops, tableName, snapshotIds.subList(2, snapshotIds.size())); + checkSnapshots( + ops, + tableName, + snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size())); } }