Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for compaction with filter #263

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import scala.collection.JavaConverters;

/**
Expand Down Expand Up @@ -298,6 +301,56 @@ public List<Path> deleteStagedFiles(Path baseDir, int olderThanDays, boolean rec
* run, it can compact all partitions, hence we should be careful and to set the
* max-concurrent-file-group-rewrites to lower number.
*/
public RewriteDataFiles.Result rewriteDataFiles(
Table table,
long targetByteSize,
long minByteSize,
long maxByteSize,
int minInputFiles,
int maxConcurrentFileGroupRewrites,
boolean partialProgressEnabled,
int partialProgressMaxCommits,
String where) {
RewriteDataFiles rewriteAction =
SparkActions.get(spark)
.rewriteDataFiles(table)
.binPack()
// maximum number of file groups to be simultaneously rewritten
.option(
"max-concurrent-file-group-rewrites",
Integer.toString(maxConcurrentFileGroupRewrites))
// enable committing groups of files prior to the entire rewrite completing
.option("partial-progress.enabled", Boolean.toString(partialProgressEnabled))
// maximum amount of commits that this rewrite is allowed to produce if partial progress
// is
// enabled
.option("partial-progress.max-commits", Integer.toString(partialProgressMaxCommits))
// any file group exceeding this number of files will be rewritten regardless of other
// criteria
.option("min-input-files", Integer.toString(minInputFiles))
// 512MB
.option("target-file-size-bytes", Long.toString(targetByteSize))
// files under this threshold will be considered for rewriting regardless of any other
// criteria
.option("min-file-size-bytes", Long.toString(minByteSize))
// files with sizes above this threshold will be considered for rewriting regardless of
// any
// other criteria
.option("max-file-size-bytes", Long.toString(maxByteSize));

if (where != null) {
Expression expression;
try {
expression =
SparkExpressionConverter.collectResolvedSparkExpression(spark(), table.name(), where);
} catch (AnalysisException e) {
throw new RuntimeException("Failed to parse filter expression", e);
}
rewriteAction.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
}
return rewriteAction.execute();
}

public RewriteDataFiles.Result rewriteDataFiles(
Table table,
long targetByteSize,
Expand All @@ -307,29 +360,16 @@ public RewriteDataFiles.Result rewriteDataFiles(
int maxConcurrentFileGroupRewrites,
boolean partialProgressEnabled,
int partialProgressMaxCommits) {
return SparkActions.get(spark)
.rewriteDataFiles(table)
.binPack()
// maximum number of file groups to be simultaneously rewritten
.option(
"max-concurrent-file-group-rewrites", Integer.toString(maxConcurrentFileGroupRewrites))
// enable committing groups of files prior to the entire rewrite completing
.option("partial-progress.enabled", Boolean.toString(partialProgressEnabled))
// maximum amount of commits that this rewrite is allowed to produce if partial progress is
// enabled
.option("partial-progress.max-commits", Integer.toString(partialProgressMaxCommits))
// any file group exceeding this number of files will be rewritten regardless of other
// criteria
.option("min-input-files", Integer.toString(minInputFiles))
// 512MB
.option("target-file-size-bytes", Long.toString(targetByteSize))
// files under this threshold will be considered for rewriting regardless of any other
// criteria
.option("min-file-size-bytes", Long.toString(minByteSize))
// files with sizes above this threshold will be considered for rewriting regardless of any
// other criteria
.option("max-file-size-bytes", Long.toString(maxByteSize))
.execute();
return rewriteDataFiles(
table,
targetByteSize,
minByteSize,
maxByteSize,
minInputFiles,
maxConcurrentFileGroupRewrites,
partialProgressEnabled,
partialProgressMaxCommits,
null);
}

public void rename(final Path src, final Path dest) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -472,6 +475,65 @@ public void testDataCompactionPartialProgressPartitionedTable() throws Exception
}
}

@Test
public void testDataCompactionPartitionedTableWithFilter() throws Exception {
final String tableName = "db.test_data_compaction_partitioned_with_filter";
final int numInsertsPerPartition = 3;
final int numDailyPartitions = 10;
final int numCompactedPartitions = 5;
final int maxCommits = 5;
long fixedTimestampMillis = System.currentTimeMillis();
long fixedTimestampSeconds = fixedTimestampMillis / 1000;
final String cutOffDate =
new SimpleDateFormat("yyyy-MM-dd")
.format(
new Date(fixedTimestampMillis - TimeUnit.DAYS.toMillis(numCompactedPartitions)));

BiFunction<Operations, Table, RewriteDataFiles.Result> rewriteFunc =
(ops, table) ->
ops.rewriteDataFiles(
table,
1024 * 1024, // 1MB
1024, // 1KB
1024 * 1024 * 2, // 2MB
2,
1,
false,
maxCommits,
String.format("ts > TIMESTAMP '%s'", cutOffDate));

try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName, true);
for (int daysLag = 0; daysLag < numDailyPartitions; ++daysLag) {
populateTable(ops, tableName, numInsertsPerPartition, daysLag, fixedTimestampSeconds);
}
log.info("Produced the following data files:");
getDataFiles(ops, tableName).forEach(f -> log.info(f.toString()));
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());
RewriteDataFiles.Result result = rewriteFunc.apply(ops, table);
log.info(
"Added {} data files, rewritten {} data files, rewritten {} bytes",
result.addedDataFilesCount(),
result.rewrittenDataFilesCount(),
result.rewrittenBytesCount());
Assertions.assertEquals(numCompactedPartitions, result.addedDataFilesCount());
Assertions.assertEquals(
numInsertsPerPartition * numCompactedPartitions, result.rewrittenDataFilesCount());
result
.rewriteResults()
.forEach(
fileGroupRewriteResult -> {
log.info(
"File group {} has {} added files, {} rewritten files, {} rewritten bytes",
Operations.groupInfoToString(fileGroupRewriteResult.info()),
fileGroupRewriteResult.addedDataFilesCount(),
fileGroupRewriteResult.rewrittenDataFilesCount(),
fileGroupRewriteResult.rewrittenBytesCount());
});
}
}

@Test
public void testOrphanDirsDeletionJavaAPI() throws Exception {
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
Expand Down
Loading