Skip to content

Commit

Permalink
Adds support in snapshot expiration job to use history policy configu…
Browse files Browse the repository at this point in the history
…rations
  • Loading branch information
Will-Lo committed Dec 23, 2024
1 parent 347e4fd commit 7da279d
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ protected List<String> getArgs() {
"--maxAge", Integer.toString(config.getMaxAge()),
"--granularity", config.getGranularity().getValue()));
}
if (config.getMinVersions() > 0) {
jobArgs.addAll(Arrays.asList("--minVersions", Integer.toString(config.getMinVersions())));
if (config.getVersions() > 0) {
jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions())));
}
return jobArgs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.openhouse.jobs.spark;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
Expand All @@ -21,6 +20,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -202,34 +202,35 @@ public void deleteStagedOrphanDirectory(
}

/** Expire snapshots on a given fully-qualified table name. */
public void expireSnapshots(String fqtn, long expireBeforeTimestampMs, int minVersions) {
expireSnapshots(getTable(fqtn), expireBeforeTimestampMs, minVersions);
public void expireSnapshots(String fqtn, int maxAge, String granularity, int versions) {
expireSnapshots(getTable(fqtn), maxAge, granularity, versions);
}

/** Expire snapshots on a given {@link Table}. */
public void expireSnapshots(Table table, long expireBeforeTimestampMs, int minVersions) {
// min versions to keep not defined, expire based on timestamp of snapshots
if (minVersions == 0) {
log.info(
"Expiring snapshots for table: {} before timestamp: {}", table, expireBeforeTimestampMs);
table
.expireSnapshots()
.cleanExpiredFiles(false)
.expireOlderThan(expireBeforeTimestampMs)
.commit();
} else if (Iterators.size(table.snapshots().iterator()) > minVersions) {
log.info("Expiring snapshots for table: {} beyond last {} versions", table, minVersions);
table
.expireSnapshots()
.cleanExpiredFiles(false)
.expireOlderThan(expireBeforeTimestampMs)
.retainLast(minVersions)
/**
* Expire snapshots on a given {@link Table}. If maxAge is provided, it will expire snapshots
* older than maxAge milliseconds. If versions is provided, it will retain the last versions
* snapshots. If both are provided, it will prioritize maxAge - only retain up to versions number
* of snapshots younger than the maxAge
*/
public void expireSnapshots(Table table, int maxAge, String granularity, int versions) {
ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false);

// maxAge is always defined with granularity
if (!granularity.isEmpty()) {
TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase());
long expireBeforeTimestampMs =
System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge);
log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs);
expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit();
}
if (versions > 0) {
log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions);
// Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan
// currentTime
expireSnapshotsCommand
.expireOlderThan(System.currentTimeMillis())
.retainLast(versions)
.commit();
} else {
log.warn(
"Table {} has less than {} minimum configured versions, skipping snapshot expiration",
table,
minVersions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.linkedin.openhouse.jobs.spark.state.StateManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
Expand All @@ -19,26 +18,36 @@
public class SnapshotsExpirationSparkApp extends BaseTableSparkApp {
private final String granularity;
private final int maxAge;
private final int minVersions;
private final int versions;

private static final String DEFAULT_MAX_AGE = "3";
public static class DEFAULT_CONFIGURATION {
public static final String MAX_AGE = "3";
public static final String GRANULARITY = "DAYS";
public static final String VERSIONS = "0";
}

private static final String DEFAULT_GRANULARITY = "days";
private static final String DEFAULT_GRANULARITY = "";

// Default do not define min versions, only retain snapshots based on max age
private static final String DEFAULT_MIN_VERSIONS = "0";
// By default do not define versions, and only retain snapshots based on max age
private static final String DEFAULT_VERSIONS = "0";

public SnapshotsExpirationSparkApp(
String jobId,
StateManager stateManager,
String fqtn,
int maxAge,
String granularity,
int minVersions) {
int versions) {
super(jobId, stateManager, fqtn);
this.granularity = granularity;
this.maxAge = maxAge;
this.minVersions = minVersions;
if (maxAge == 0 && versions == 0) {
this.maxAge = Integer.parseInt(DEFAULT_CONFIGURATION.MAX_AGE);
this.granularity = DEFAULT_CONFIGURATION.GRANULARITY;
this.versions = Integer.parseInt(DEFAULT_CONFIGURATION.VERSIONS);
} else {
this.granularity = granularity;
this.maxAge = maxAge;
this.versions = versions;
}
}

@Override
Expand All @@ -48,10 +57,8 @@ protected void runInner(Operations ops) {
fqtn,
maxAge,
granularity,
minVersions);
long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxAge);
log.info("Expire snapshots before timestamp ms {}", expireBeforeTimestampMs);
ops.expireSnapshots(fqtn, expireBeforeTimestampMs, minVersions);
versions);
ops.expireSnapshots(fqtn, maxAge, granularity, versions);
}

public static void main(String[] args) {
Expand All @@ -60,16 +67,18 @@ public static void main(String[] args) {
extraOptions.add(
new Option("a", "maxAge", true, "Delete snapshots older than <maxAge> <granularity>s"));
extraOptions.add(new Option("g", "granularity", true, "Granularity: day"));
extraOptions.add(new Option("v", "minVersions", true, "Minimum number of versions to keep"));
extraOptions.add(
new Option("v", "versions", true, "Number of versions to keep after snapshot expiration"));
CommandLine cmdLine = createCommandLine(args, extraOptions);

SnapshotsExpirationSparkApp app =
new SnapshotsExpirationSparkApp(
getJobId(cmdLine),
createStateManager(cmdLine),
cmdLine.getOptionValue("tableName"),
Integer.parseInt(cmdLine.getOptionValue("maxAge", DEFAULT_MAX_AGE)),
cmdLine.getOptionValue("granularity", DEFAULT_GRANULARITY),
Integer.parseInt(cmdLine.getOptionValue("minVersions", DEFAULT_MIN_VERSIONS)));
Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")),
cmdLine.getOptionValue("granularity", ""),
Integer.parseInt(cmdLine.getOptionValue("minVersions", "0")));
app.run();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.jobs.util;

import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tables.client.model.History;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -13,6 +13,6 @@
@ToString
public class HistoryConfig {
private final int maxAge;
private final int minVersions;
private final Retention.GranularityEnum granularity;
private final int versions;
private final History.GranularityEnum granularity;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Row;
import org.assertj.core.util.Lists;
import org.joda.time.DurationFieldType;
import org.joda.time.Hours;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -275,8 +273,44 @@ public void testOrphanFilesDeletionNoStaging() throws Exception {

@Test
public void testSnapshotsExpirationMaxAge() throws Exception {
final String tableName = "db.test_es_maxage_noop_java";
final int numInserts = 3;
final int maxAge = 0;
// Not a realistic time setting that is accepted by the SQL, but tests that other snapshots are
// deleted
final String timeGranularity = "DAYS";

List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
populateTable(ops, tableName, numInserts);
snapshotIds = getSnapshotIds(ops, tableName);
Assertions.assertEquals(
numInserts,
snapshotIds.size(),
String.format("There must be %d snapshot(s) after inserts", numInserts));
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());

ops.expireSnapshots(table, maxAge, timeGranularity, 0);
// No snapshots should be cleaned up as they are all within the max age
checkSnapshots(table, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size()));
}
// restart the app to reload catalog cache
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// verify that new apps see snapshots correctly
checkSnapshots(
ops, tableName, snapshotIds.subList(snapshotIds.size() - 1, snapshotIds.size()));
}
}

@Test
public void testSnapshotsExpirationMaxAgeNoop() throws Exception {
final String tableName = "db.test_es_maxage_java";
final int numInserts = 3;
final int maxAge = 3;
final String timeGranularity = "DAYS";

List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
Expand All @@ -288,21 +322,57 @@ public void testSnapshotsExpirationMaxAge() throws Exception {
String.format("There must be %d snapshot(s) after inserts", numInserts));
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());
ops.expireSnapshots(table, System.currentTimeMillis(), 3);

ops.expireSnapshots(table, maxAge, timeGranularity, 0);
// No snapshots should be cleaned up as they are all within the max age
checkSnapshots(table, snapshotIds);
}
// restart the app to reload catalog cache
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// verify that new apps see snapshots correctly
checkSnapshots(ops, tableName, snapshotIds);
}
}

@Test
public void testSnapshotsExpirationVersions() throws Exception {
final String tableName = "db.test_es_versions_java";
final int numInserts = 3;
final int versionsToKeep = 2;
List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
populateTable(ops, tableName, numInserts);
snapshotIds = getSnapshotIds(ops, tableName);
Assertions.assertEquals(
numInserts,
snapshotIds.size(),
String.format("There must be %d snapshot(s) after inserts", numInserts));
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());

ops.expireSnapshots(table, 0, "", versionsToKeep);
// verify that table object snapshots are updated
checkSnapshots(table, snapshotIds.subList(2, snapshotIds.size()));
checkSnapshots(
table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size()));
}
// restart the app to reload catalog cache
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// verify that new apps see snapshots correctly
checkSnapshots(ops, tableName, snapshotIds.subList(2, snapshotIds.size()));
checkSnapshots(
ops,
tableName,
snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size()));
}
}

@Test
public void testSnapshotsExpirationMinVersions() throws Exception {
final String tableName = "db.test_es_minversions_java";
public void testSnapshotsExpirationBothAgeAndVersions() throws Exception {
final String tableName = "db.test_es_age_and_versions_java";
final int numInserts = 3;
final int maxAge = 3;
final String timeGranularity = "DAYS";
final int versionsToKeep = 1;
List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
Expand All @@ -315,15 +385,18 @@ public void testSnapshotsExpirationMinVersions() throws Exception {
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());

ops.expireSnapshots(
table, System.currentTimeMillis() + Hours.ONE.get(DurationFieldType.millis()), 1);
ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep);
// verify that table object snapshots are updated
checkSnapshots(table, snapshotIds.subList(2, snapshotIds.size()));
checkSnapshots(
table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size()));
}
// restart the app to reload catalog cache
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// verify that new apps see snapshots correctly
checkSnapshots(ops, tableName, snapshotIds.subList(2, snapshotIds.size()));
checkSnapshots(
ops,
tableName,
snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size()));
}
}

Expand Down

0 comments on commit 7da279d

Please sign in to comment.