Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Table History Policy in Snapshot Expiration Job #274

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.HistoryConfig;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand All @@ -28,10 +30,18 @@ public JobConf.JobTypeEnum getType() {

@Override
protected List<String> getArgs() {
return Arrays.asList(
"--tableName", metadata.fqtn(),
"--granularity", "days",
"--count", "3");
HistoryConfig config = metadata.getHistoryConfig();
List<String> jobArgs = new ArrayList<>();
if (config.getMaxAge() > 0) {
jobArgs.addAll(
Arrays.asList(
"--maxAge", Integer.toString(config.getMaxAge()),
"--granularity", config.getGranularity().getValue()));
}
if (config.getVersions() > 0) {
jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions())));
}
return jobArgs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.jobs.spark;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
Expand All @@ -20,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -201,17 +203,36 @@ public void deleteStagedOrphanDirectory(
}

/** Expire snapshots on a given fully-qualified table name. */
public void expireSnapshots(String fqtn, long expireBeforeTimestampMs) {
expireSnapshots(getTable(fqtn), expireBeforeTimestampMs);
public void expireSnapshots(String fqtn, int maxAge, String granularity, int versions) {
expireSnapshots(getTable(fqtn), maxAge, granularity, versions);
}

/** Expire snapshots on a given {@link Table}. */
public void expireSnapshots(Table table, long expireBeforeTimestampMs) {
table
.expireSnapshots()
.cleanExpiredFiles(false)
.expireOlderThan(expireBeforeTimestampMs)
.commit();
/**
* Expire snapshots on a given {@link Table}. If maxAge is provided, it will expire snapshots
* older than maxAge in granularity timeunit. If versions is provided, it will retain the last
* versions snapshots. If both are provided, it will prioritize maxAge; only retain up to versions
* number of snapshots younger than the maxAge
*/
public void expireSnapshots(Table table, int maxAge, String granularity, int versions) {
ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false);

// maxAge is always defined with granularity
if (!granularity.isEmpty()) {
TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase());
long expireBeforeTimestampMs =
System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge);
log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs);
expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit();
}
if (versions > 0 && Iterators.size(table.snapshots().iterator()) > versions) {
log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions);
// Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan
// currentTime
expireSnapshotsCommand
.expireOlderThan(System.currentTimeMillis())
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
.retainLast(versions)
.commit();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.linkedin.openhouse.jobs.spark.state.StateManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
Expand All @@ -18,41 +17,68 @@
@Slf4j
public class SnapshotsExpirationSparkApp extends BaseTableSparkApp {
private final String granularity;
private final int count;
private final int maxAge;
private final int versions;

public static class DEFAULT_CONFIGURATION {
public static final String MAX_AGE = "3";
public static final String GRANULARITY = "DAYS";
public static final String VERSIONS = "0";
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
}

private static final String DEFAULT_GRANULARITY = "";

// By default do not define versions, and only retain snapshots based on max age
private static final String DEFAULT_VERSIONS = "0";

public SnapshotsExpirationSparkApp(
String jobId, StateManager stateManager, String fqtn, String granularity, int count) {
String jobId,
StateManager stateManager,
String fqtn,
int maxAge,
String granularity,
int versions) {
super(jobId, stateManager, fqtn);
this.granularity = granularity;
this.count = count;
if (maxAge == 0 && versions == 0) {
abhisheknath2011 marked this conversation as resolved.
Show resolved Hide resolved
this.maxAge = Integer.parseInt(DEFAULT_CONFIGURATION.MAX_AGE);
this.granularity = DEFAULT_CONFIGURATION.GRANULARITY;
this.versions = Integer.parseInt(DEFAULT_CONFIGURATION.VERSIONS);
} else {
this.granularity = granularity;
this.maxAge = maxAge;
this.versions = versions;
}
}

@Override
protected void runInner(Operations ops) {
log.info(
"Snapshot expiration app start for table {}, expiring older than {} {}s",
"Snapshot expiration app start for table {}, expiring older than {} {}s or with more than {} versions",
fqtn,
count,
granularity);
long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(count);
log.info("Expire snapshots before timestamp ms {}", expireBeforeTimestampMs);
ops.expireSnapshots(fqtn, expireBeforeTimestampMs);
maxAge,
granularity,
versions);
ops.expireSnapshots(fqtn, maxAge, granularity, versions);
}

public static void main(String[] args) {
List<Option> extraOptions = new ArrayList<>();
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));
extraOptions.add(
new Option("a", "maxAge", true, "Delete snapshots older than <maxAge> <granularity>s"));
extraOptions.add(new Option("g", "granularity", true, "Granularity: day"));
extraOptions.add(
new Option("c", "count", true, "Delete snapshots older than <count> <granularity>s"));
new Option("v", "versions", true, "Number of versions to keep after snapshot expiration"));
CommandLine cmdLine = createCommandLine(args, extraOptions);

SnapshotsExpirationSparkApp app =
new SnapshotsExpirationSparkApp(
getJobId(cmdLine),
createStateManager(cmdLine),
cmdLine.getOptionValue("tableName"),
cmdLine.getOptionValue("granularity"),
Integer.parseInt(cmdLine.getOptionValue("count")));
Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")),
cmdLine.getOptionValue("granularity", ""),
Integer.parseInt(cmdLine.getOptionValue("minVersions", "0")));
app.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.openhouse.jobs.util;

import com.linkedin.openhouse.tables.client.model.History;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** History Policy config class. This is app side representation of /tables policies->history */
@Builder
@Getter
@EqualsAndHashCode
@ToString
public class HistoryConfig {
private final int maxAge;
private final int versions;
private final History.GranularityEnum granularity;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class TableMetadata extends Metadata {
protected boolean isClustered;
@Builder.Default protected @NonNull Map<String, String> jobExecutionProperties = new HashMap<>();
protected @Nullable RetentionConfig retentionConfig;
protected @Nullable HistoryConfig historyConfig;

public String fqtn() {
return String.format("%s.%s", dbName, tableName);
Expand Down
Loading
Loading