From 63a3d689172cbe06627d969d4a7073a396bdea65 Mon Sep 17 00:00:00 2001 From: Will-Lo Date: Thu, 12 Dec 2024 16:11:39 -0500 Subject: [PATCH 1/8] WIP Implement history configurations in expiration jobs --- .../tasks/TableSnapshotsExpirationTask.java | 18 +++++++-- .../openhouse/jobs/spark/Operations.java | 7 ++-- .../spark/SnapshotsExpirationSparkApp.java | 40 +++++++++++++------ .../openhouse/jobs/util/HistoryConfig.java | 18 +++++++++ .../openhouse/jobs/util/TableMetadata.java | 1 + .../openhouse/jobs/spark/OperationsTest.java | 34 +++++++++++++++- 6 files changed, 97 insertions(+), 21 deletions(-) create mode 100644 apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/HistoryConfig.java diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java index ed7d69096..8a32eb87b 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java @@ -3,7 +3,9 @@ import com.linkedin.openhouse.jobs.client.JobsClient; import com.linkedin.openhouse.jobs.client.TablesClient; import com.linkedin.openhouse.jobs.client.model.JobConf; +import com.linkedin.openhouse.jobs.util.HistoryConfig; import com.linkedin.openhouse.jobs.util.TableMetadata; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -28,10 +30,18 @@ public JobConf.JobTypeEnum getType() { @Override protected List getArgs() { - return Arrays.asList( - "--tableName", metadata.fqtn(), - "--granularity", "days", - "--count", "3"); + HistoryConfig config = metadata.getHistoryConfig(); + List jobArgs = new ArrayList<>(); + if (config.getMaxAge() > 0) { + jobArgs.addAll( + Arrays.asList( + "--maxAge", Integer.toString(config.getMaxAge()), + "--granularity", config.getGranularity().getValue())); + } + if (config.getMinVersions() > 0) { + jobArgs.addAll(Arrays.asList("--minVersions", Integer.toString(config.getMinVersions()))); + } + return jobArgs; } @Override diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 02d5ffbc9..6842690a8 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -201,16 +201,17 @@ public void deleteStagedOrphanDirectory( } /** Expire snapshots on a given fully-qualified table name. */ - public void expireSnapshots(String fqtn, long expireBeforeTimestampMs) { - expireSnapshots(getTable(fqtn), expireBeforeTimestampMs); + public void expireSnapshots(String fqtn, long expireBeforeTimestampMs, int minVersions) { + expireSnapshots(getTable(fqtn), expireBeforeTimestampMs, minVersions); } /** Expire snapshots on a given {@link Table}. */ - public void expireSnapshots(Table table, long expireBeforeTimestampMs) { + public void expireSnapshots(Table table, long expireBeforeTimestampMs, int minVersions) { table .expireSnapshots() .cleanExpiredFiles(false) .expireOlderThan(expireBeforeTimestampMs) + .retainLast(minVersions) .commit(); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java index 72c52324e..1a071a155 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java @@ -18,41 +18,57 @@ @Slf4j public class SnapshotsExpirationSparkApp extends BaseTableSparkApp { private final String granularity; - private final int count; + private final int maxAge; + private final int minVersions; + + private static final String DEFAULT_MAX_AGE = "3"; + + private static final String DEFAULT_GRANULARITY = "days"; + + private static final String DEFAULT_MIN_VERSIONS = "100"; public SnapshotsExpirationSparkApp( - String jobId, StateManager stateManager, String fqtn, String granularity, int count) { + String jobId, + StateManager stateManager, + String fqtn, + int maxAge, + String granularity, + int minVersions) { super(jobId, stateManager, fqtn); this.granularity = granularity; - this.count = count; + this.maxAge = maxAge; + this.minVersions = minVersions; } @Override protected void runInner(Operations ops) { log.info( - "Snapshot expiration app start for table {}, expiring older than {} {}s", + "Snapshot expiration app start for table {}, expiring older than {} {}s or with more than {} versions", fqtn, - count, - granularity); - long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(count); + maxAge, + granularity, + minVersions); + long expireBeforeTimestampMs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(maxAge); log.info("Expire snapshots before timestamp ms {}", expireBeforeTimestampMs); - ops.expireSnapshots(fqtn, expireBeforeTimestampMs); + ops.expireSnapshots(fqtn, expireBeforeTimestampMs, minVersions); } public static void main(String[] args) { List