diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index a4c955b8..9cc4b793 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -30,7 +30,10 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.execution.datasources.SparkExpressionConverter; import scala.collection.JavaConverters; /** @@ -298,6 +301,56 @@ public List deleteStagedFiles(Path baseDir, int olderThanDays, boolean rec * run, it can compact all partitions, hence we should be careful and to set the * max-concurrent-file-group-rewrites to lower number. */ + public RewriteDataFiles.Result rewriteDataFiles( + Table table, + long targetByteSize, + long minByteSize, + long maxByteSize, + int minInputFiles, + int maxConcurrentFileGroupRewrites, + boolean partialProgressEnabled, + int partialProgressMaxCommits, + String where) { + RewriteDataFiles rewriteAction = + SparkActions.get(spark) + .rewriteDataFiles(table) + .binPack() + // maximum number of file groups to be simultaneously rewritten + .option( + "max-concurrent-file-group-rewrites", + Integer.toString(maxConcurrentFileGroupRewrites)) + // enable committing groups of files prior to the entire rewrite completing + .option("partial-progress.enabled", Boolean.toString(partialProgressEnabled)) + // maximum amount of commits that this rewrite is allowed to produce if partial progress + // is + // enabled + .option("partial-progress.max-commits", Integer.toString(partialProgressMaxCommits)) + // any file group exceeding this number of files will be rewritten regardless of other + // criteria + .option("min-input-files", Integer.toString(minInputFiles)) + // 512MB + .option("target-file-size-bytes", Long.toString(targetByteSize)) + // files under this threshold will be considered for rewriting regardless of any other + // criteria + .option("min-file-size-bytes", Long.toString(minByteSize)) + // files with sizes above this threshold will be considered for rewriting regardless of + // any + // other criteria + .option("max-file-size-bytes", Long.toString(maxByteSize)); + + if (where != null) { + Expression expression; + try { + expression = + SparkExpressionConverter.collectResolvedSparkExpression(spark(), table.name(), where); + } catch (AnalysisException e) { + throw new RuntimeException("Failed to parse filter expression", e); + } + rewriteAction.filter(SparkExpressionConverter.convertToIcebergExpression(expression)); + } + return rewriteAction.execute(); + } + public RewriteDataFiles.Result rewriteDataFiles( Table table, long targetByteSize, @@ -307,29 +360,16 @@ public RewriteDataFiles.Result rewriteDataFiles( int maxConcurrentFileGroupRewrites, boolean partialProgressEnabled, int partialProgressMaxCommits) { - return SparkActions.get(spark) - .rewriteDataFiles(table) - .binPack() - // maximum number of file groups to be simultaneously rewritten - .option( - "max-concurrent-file-group-rewrites", Integer.toString(maxConcurrentFileGroupRewrites)) - // enable committing groups of files prior to the entire rewrite completing - .option("partial-progress.enabled", Boolean.toString(partialProgressEnabled)) - // maximum amount of commits that this rewrite is allowed to produce if partial progress is - // enabled - .option("partial-progress.max-commits", Integer.toString(partialProgressMaxCommits)) - // any file group exceeding this number of files will be rewritten regardless of other - // criteria - .option("min-input-files", Integer.toString(minInputFiles)) - // 512MB - .option("target-file-size-bytes", Long.toString(targetByteSize)) - // files under this threshold will be considered for rewriting regardless of any other - // criteria - .option("min-file-size-bytes", Long.toString(minByteSize)) - // files with sizes above this threshold will be considered for rewriting regardless of any - // other criteria - .option("max-file-size-bytes", Long.toString(maxByteSize)) - .execute(); + return rewriteDataFiles( + table, + targetByteSize, + minByteSize, + maxByteSize, + minInputFiles, + maxConcurrentFileGroupRewrites, + partialProgressEnabled, + partialProgressMaxCommits, + null); } public void rename(final Path src, final Path dest) throws IOException { diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 245bd7d2..43ddc420 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -9,11 +9,14 @@ import com.linkedin.openhouse.tables.client.model.Retention; import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; import io.opentelemetry.api.metrics.Meter; +import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -472,6 +475,65 @@ public void testDataCompactionPartialProgressPartitionedTable() throws Exception } } + @Test + public void testDataCompactionPartitionedTableWithFilter() throws Exception { + final String tableName = "db.test_data_compaction_partitioned_with_filter"; + final int numInsertsPerPartition = 3; + final int numDailyPartitions = 10; + final int numCompactedPartitions = 5; + final int maxCommits = 5; + long fixedTimestampMillis = System.currentTimeMillis(); + long fixedTimestampSeconds = fixedTimestampMillis / 1000; + final String cutOffDate = + new SimpleDateFormat("yyyy-MM-dd") + .format( + new Date(fixedTimestampMillis - TimeUnit.DAYS.toMillis(numCompactedPartitions))); + + BiFunction rewriteFunc = + (ops, table) -> + ops.rewriteDataFiles( + table, + 1024 * 1024, // 1MB + 1024, // 1KB + 1024 * 1024 * 2, // 2MB + 2, + 1, + false, + maxCommits, + String.format("ts > TIMESTAMP '%s'", cutOffDate)); + + try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { + prepareTable(ops, tableName, true); + for (int daysLag = 0; daysLag < numDailyPartitions; ++daysLag) { + populateTable(ops, tableName, numInsertsPerPartition, daysLag, fixedTimestampSeconds); + } + log.info("Produced the following data files:"); + getDataFiles(ops, tableName).forEach(f -> log.info(f.toString())); + Table table = ops.getTable(tableName); + log.info("Loaded table {}, location {}", table.name(), table.location()); + RewriteDataFiles.Result result = rewriteFunc.apply(ops, table); + log.info( + "Added {} data files, rewritten {} data files, rewritten {} bytes", + result.addedDataFilesCount(), + result.rewrittenDataFilesCount(), + result.rewrittenBytesCount()); + Assertions.assertEquals(numCompactedPartitions, result.addedDataFilesCount()); + Assertions.assertEquals( + numInsertsPerPartition * numCompactedPartitions, result.rewrittenDataFilesCount()); + result + .rewriteResults() + .forEach( + fileGroupRewriteResult -> { + log.info( + "File group {} has {} added files, {} rewritten files, {} rewritten bytes", + Operations.groupInfoToString(fileGroupRewriteResult.info()), + fileGroupRewriteResult.addedDataFilesCount(), + fileGroupRewriteResult.rewrittenDataFilesCount(), + fileGroupRewriteResult.rewrittenBytesCount()); + }); + } + } + @Test public void testOrphanDirsDeletionJavaAPI() throws Exception { try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {