Skip to content

Commit 3e8d387

Browse files
authored
Support Table History Policy in Snapshot Expiration Job (#274)
## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> [[Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly discuss the summary of the changes made in this pull request in 2-3 lines. Following up from #262 and #259 this PR adds support for snapshot expiration table maintenance job to use the history policies defined. Most notably snapshot expiration will follow the settings of `maxAge`, `granularity`, and `versions` as follows: 1. If maxAge is provided, it will expire snapshots older than maxAge in granularity timeunit. 2. If versions is provided, it will retain the last versions snapshots regardless of their age. 3. If both are provided, it will prioritize maxAge; only retain up to versions number of snapshots younger than the maxAge. This is done by pruning the snapshots older than maxAge, and then running a second expiration to keeping N versions after that. Note: If versions are defined and there are less than N versions in the history, then there were not enough commits (within that timespan if defined). Snapshot expiration will always keep at least 1 version. The default behavior of snapshot expiration job will remain the same, keep snapshots within the last 3 days. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent f12fb15 commit 3e8d387

File tree

6 files changed

+280
-32
lines changed

6 files changed

+280
-32
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.linkedin.openhouse.jobs.client.JobsClient;
44
import com.linkedin.openhouse.jobs.client.TablesClient;
55
import com.linkedin.openhouse.jobs.client.model.JobConf;
6+
import com.linkedin.openhouse.jobs.util.HistoryConfig;
67
import com.linkedin.openhouse.jobs.util.TableMetadata;
8+
import java.util.ArrayList;
79
import java.util.Arrays;
810
import java.util.List;
911

@@ -28,10 +30,18 @@ public JobConf.JobTypeEnum getType() {
2830

2931
@Override
3032
protected List<String> getArgs() {
31-
return Arrays.asList(
32-
"--tableName", metadata.fqtn(),
33-
"--granularity", "days",
34-
"--count", "3");
33+
HistoryConfig config = metadata.getHistoryConfig();
34+
List<String> jobArgs = new ArrayList<>();
35+
if (config.getMaxAge() > 0) {
36+
jobArgs.addAll(
37+
Arrays.asList(
38+
"--maxAge", Integer.toString(config.getMaxAge()),
39+
"--granularity", config.getGranularity().getValue()));
40+
}
41+
if (config.getVersions() > 0) {
42+
jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions())));
43+
}
44+
return jobArgs;
3545
}
3646

3747
@Override

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java

+30-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.openhouse.jobs.spark;
22

3+
import com.google.common.collect.Iterators;
34
import com.google.common.collect.Lists;
45
import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
56
import com.linkedin.openhouse.jobs.util.SparkJobUtil;
@@ -20,6 +21,7 @@
2021
import org.apache.hadoop.fs.Path;
2122
import org.apache.hadoop.fs.RemoteIterator;
2223
import org.apache.iceberg.CatalogUtil;
24+
import org.apache.iceberg.ExpireSnapshots;
2325
import org.apache.iceberg.Schema;
2426
import org.apache.iceberg.StructLike;
2527
import org.apache.iceberg.Table;
@@ -201,17 +203,36 @@ public void deleteStagedOrphanDirectory(
201203
}
202204

203205
/** Expire snapshots on a given fully-qualified table name. */
204-
public void expireSnapshots(String fqtn, long expireBeforeTimestampMs) {
205-
expireSnapshots(getTable(fqtn), expireBeforeTimestampMs);
206+
public void expireSnapshots(String fqtn, int maxAge, String granularity, int versions) {
207+
expireSnapshots(getTable(fqtn), maxAge, granularity, versions);
206208
}
207209

208-
/** Expire snapshots on a given {@link Table}. */
209-
public void expireSnapshots(Table table, long expireBeforeTimestampMs) {
210-
table
211-
.expireSnapshots()
212-
.cleanExpiredFiles(false)
213-
.expireOlderThan(expireBeforeTimestampMs)
214-
.commit();
210+
/**
211+
* Expire snapshots on a given {@link Table}. If maxAge is provided, it will expire snapshots
212+
* older than maxAge in granularity timeunit. If versions is provided, it will retain the last
213+
* versions snapshots. If both are provided, it will prioritize maxAge; only retain up to versions
214+
* number of snapshots younger than the maxAge
215+
*/
216+
public void expireSnapshots(Table table, int maxAge, String granularity, int versions) {
217+
ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false);
218+
219+
// maxAge is always defined with granularity
220+
if (!granularity.isEmpty()) {
221+
TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase());
222+
long expireBeforeTimestampMs =
223+
System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge);
224+
log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs);
225+
expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit();
226+
}
227+
if (versions > 0 && Iterators.size(table.snapshots().iterator()) > versions) {
228+
log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions);
229+
// Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan
230+
// currentTime
231+
expireSnapshotsCommand
232+
.expireOlderThan(System.currentTimeMillis())
233+
.retainLast(versions)
234+
.commit();
235+
}
215236
}
216237

217238
/**

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java

+40-14
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.linkedin.openhouse.jobs.spark.state.StateManager;
44
import java.util.ArrayList;
55
import java.util.List;
6-
import java.util.concurrent.TimeUnit;
76
import lombok.extern.slf4j.Slf4j;
87
import org.apache.commons.cli.CommandLine;
98
import org.apache.commons.cli.Option;
@@ -18,41 +17,68 @@
1817
@Slf4j
1918
public class SnapshotsExpirationSparkApp extends BaseTableSparkApp {
2019
private final String granularity;
21-
private final int count;
20+
private final int maxAge;
21+
private final int versions;
22+
23+
public static class DEFAULT_CONFIGURATION {
24+
public static final int MAX_AGE = 3;
25+
public static final String GRANULARITY = "DAYS";
26+
public static final int VERSIONS = 0;
27+
}
28+
29+
private static final String DEFAULT_GRANULARITY = "";
30+
31+
// By default do not define versions, and only retain snapshots based on max age
32+
private static final String DEFAULT_VERSIONS = "0";
2233

2334
public SnapshotsExpirationSparkApp(
24-
String jobId, StateManager stateManager, String fqtn, String granularity, int count) {
35+
String jobId,
36+
StateManager stateManager,
37+
String fqtn,
38+
int maxAge,
39+
String granularity,
40+
int versions) {
2541
super(jobId, stateManager, fqtn);
26-
this.granularity = granularity;
27-
this.count = count;
42+
if (maxAge == 0 && versions == 0) {
43+
this.maxAge = DEFAULT_CONFIGURATION.MAX_AGE;
44+
this.granularity = DEFAULT_CONFIGURATION.GRANULARITY;
45+
this.versions = DEFAULT_CONFIGURATION.VERSIONS;
46+
} else {
47+
this.granularity = granularity;
48+
this.maxAge = maxAge;
49+
this.versions = versions;
50+
}
2851
}
2952

3053
@Override
3154
protected void runInner(Operations ops) {
3255
log.info(
33-
"Snapshot expiration app start for table {}, expiring older than {} {}s",
56+
"Snapshot expiration app start for table {}, expiring older than {} {}s or with more than {} versions",
3457
fqtn,
35-
count,
36-
granularity);
37-
long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(count);
38-
log.info("Expire snapshots before timestamp ms {}", expireBeforeTimestampMs);
39-
ops.expireSnapshots(fqtn, expireBeforeTimestampMs);
58+
maxAge,
59+
granularity,
60+
versions);
61+
ops.expireSnapshots(fqtn, maxAge, granularity, versions);
4062
}
4163

4264
public static void main(String[] args) {
4365
List<Option> extraOptions = new ArrayList<>();
4466
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));
67+
extraOptions.add(
68+
new Option("a", "maxAge", true, "Delete snapshots older than <maxAge> <granularity>s"));
4569
extraOptions.add(new Option("g", "granularity", true, "Granularity: day"));
4670
extraOptions.add(
47-
new Option("c", "count", true, "Delete snapshots older than <count> <granularity>s"));
71+
new Option("v", "versions", true, "Number of versions to keep after snapshot expiration"));
4872
CommandLine cmdLine = createCommandLine(args, extraOptions);
73+
4974
SnapshotsExpirationSparkApp app =
5075
new SnapshotsExpirationSparkApp(
5176
getJobId(cmdLine),
5277
createStateManager(cmdLine),
5378
cmdLine.getOptionValue("tableName"),
54-
cmdLine.getOptionValue("granularity"),
55-
Integer.parseInt(cmdLine.getOptionValue("count")));
79+
Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")),
80+
cmdLine.getOptionValue("granularity", ""),
81+
Integer.parseInt(cmdLine.getOptionValue("minVersions", "0")));
5682
app.run();
5783
}
5884
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.linkedin.openhouse.jobs.util;
2+
3+
import com.linkedin.openhouse.tables.client.model.History;
4+
import lombok.Builder;
5+
import lombok.EqualsAndHashCode;
6+
import lombok.Getter;
7+
import lombok.ToString;
8+
9+
/** History Policy config class. This is app side representation of /tables policies->history */
10+
@Builder
11+
@Getter
12+
@EqualsAndHashCode
13+
@ToString
14+
public class HistoryConfig {
15+
private final int maxAge;
16+
private final int versions;
17+
private final History.GranularityEnum granularity;
18+
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadata.java

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class TableMetadata extends Metadata {
2424
protected boolean isClustered;
2525
@Builder.Default protected @NonNull Map<String, String> jobExecutionProperties = new HashMap<>();
2626
protected @Nullable RetentionConfig retentionConfig;
27+
protected @Nullable HistoryConfig historyConfig;
2728

2829
public String fqtn() {
2930
return String.format("%s.%s", dbName, tableName);

0 commit comments

Comments
 (0)