Skip to content

Commit 3520f4f

Browse files
authored
HIVE-28590: Iceberg: Add support for FILE_SIZE_THRESHOLD to compaction command (#5540) (Dmitriy Fingerman reviewed by Denys Kuzmenko)
1 parent 341f597 commit 3520f4f

File tree

44 files changed

+1120
-196
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1120
-196
lines changed

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

-2
Original file line numberDiff line numberDiff line change
@@ -2245,8 +2245,6 @@ public static enum ConfVars {
22452245
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
22462246
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
22472247
"If this is set to true, then all the data files being read should be withing the table location"),
2248-
HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE("hive.iceberg.compaction.target.file.size", "128mb",
2249-
new SizeValidator(), "Target file size for Iceberg compaction."),
22502248
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
22512249
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
22522250
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
5151
import org.apache.hadoop.hive.ql.plan.MapWork;
5252
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
53+
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
5354
import org.apache.hadoop.mapred.JobConf;
5455
import org.apache.hadoop.mapred.JobContext;
5556
import org.apache.hadoop.mapred.JobContextImpl;
@@ -516,7 +517,13 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
516517
.map(x -> x.getJobConf().get(IcebergCompactionService.PARTITION_PATH))
517518
.orElse(null);
518519

519-
commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath);
520+
long fileSizeThreshold = jobContexts.stream()
521+
.findAny()
522+
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD))
523+
.map(Long::parseLong)
524+
.orElse(-1L);
525+
526+
commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath, fileSizeThreshold);
520527
} else {
521528
commitOverwrite(table, branchName, snapshotId, startTime, filesForCommit);
522529
}
@@ -614,9 +621,10 @@ private void commit(SnapshotUpdate<?> update) {
614621
* @param partitionPath The path of the compacted partition
615622
*/
616623
private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results,
617-
String partitionPath) {
618-
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath);
619-
List<DeleteFile> existingDeleteFiles = IcebergCompactionUtil.getDeleteFiles(table, partitionPath);
624+
String partitionPath, long fileSizeThreshold) {
625+
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
626+
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
627+
IcebergCompactionUtil.getDeleteFiles(table, partitionPath) : Collections.emptyList();
620628

621629
RewriteFiles rewriteFiles = table.newRewrite();
622630
existingDataFiles.forEach(rewriteFiles::deleteFile);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
2727
import org.apache.hadoop.hive.ql.txn.compactor.service.CompactionService;
2828
import org.apache.iceberg.mr.hive.IcebergTableUtil;
29-
import org.apache.iceberg.mr.hive.compaction.evaluator.IcebergCompactionEvaluator;
29+
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

@@ -40,8 +40,9 @@ public IcebergCompactionService() {
4040

4141
public Boolean compact(Table table, CompactionInfo ci) throws Exception {
4242

43-
if (!ci.isMajorCompaction()) {
44-
ci.errorMessage = "Presently Iceberg tables support only Major compaction";
43+
if (!ci.isMajorCompaction() && !ci.isMinorCompaction()) {
44+
ci.errorMessage = String.format(
45+
"Iceberg tables do not support %s compaction type, supported types are ['MINOR', 'MAJOR']", ci.type.name());
4546
LOG.error(ci.errorMessage + " Compaction info: {}", ci);
4647
try {
4748
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
@@ -53,7 +54,9 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
5354
CompactorUtil.checkInterrupt(CLASS_NAME);
5455

5556
org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, table);
56-
if (!IcebergCompactionEvaluator.isEligibleForCompaction(icebergTable, ci.partName, ci.type, conf)) {
57+
CompactionEvaluator compactionEvaluator = new CompactionEvaluator(icebergTable, ci,
58+
table.getParameters());
59+
if (!compactionEvaluator.isEligibleForCompaction()) {
5760
LOG.info("Table={}{} doesn't meet requirements for compaction", table.getTableName(),
5861
ci.partName == null ? "" : ", partition=" + ci.partName);
5962
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public static boolean shouldIncludeForCompaction(Table table, String partitionPa
5555
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
5656
}
5757

58+
public static boolean shouldIncludeForCompaction(Table table, String partitionPath, ContentFile<?> file,
59+
long fileSizeThreshold) {
60+
return shouldIncludeForCompaction(table, partitionPath, file) &&
61+
(fileSizeThreshold == -1 || file.fileSizeInBytes() < fileSizeThreshold);
62+
}
63+
5864
/**
5965
* Returns table's list of data files as following:
6066
* 1. If the table is unpartitioned, returns all data files.
@@ -63,13 +69,13 @@ public static boolean shouldIncludeForCompaction(Table table, String partitionPa
6369
* @param table the iceberg table
6470
* @param partitionPath partition path
6571
*/
66-
public static List<DataFile> getDataFiles(Table table, String partitionPath) {
72+
public static List<DataFile> getDataFiles(Table table, String partitionPath, long fileSizeThreshold) {
6773
CloseableIterable<FileScanTask> fileScanTasks =
6874
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
6975
CloseableIterable<FileScanTask> filteredFileScanTasks =
7076
CloseableIterable.filter(fileScanTasks, t -> {
7177
DataFile file = t.asFileScanTask().file();
72-
return shouldIncludeForCompaction(table, partitionPath, file);
78+
return shouldIncludeForCompaction(table, partitionPath, file, fileSizeThreshold);
7379
});
7480
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
7581
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergMajorQueryCompactor.java iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.hive.conf.HiveConf;
2727
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
2828
import org.apache.hadoop.hive.metastore.Warehouse;
29+
import org.apache.hadoop.hive.metastore.api.CompactionType;
2930
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
3031
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
3132
import org.apache.hadoop.hive.ql.DriverUtils;
@@ -40,12 +41,13 @@
4041
import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
4142
import org.apache.iceberg.Table;
4243
import org.apache.iceberg.mr.hive.IcebergTableUtil;
44+
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
4345
import org.slf4j.Logger;
4446
import org.slf4j.LoggerFactory;
4547

46-
public class IcebergMajorQueryCompactor extends QueryCompactor {
48+
public class IcebergQueryCompactor extends QueryCompactor {
4749

48-
private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorQueryCompactor.class.getName());
50+
private static final Logger LOG = LoggerFactory.getLogger(IcebergQueryCompactor.class.getName());
4951

5052
@Override
5153
public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException {
@@ -62,20 +64,32 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
6264
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
6365
String compactionQuery;
6466
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
67+
String fileSizePredicate = null;
68+
69+
if (ci.type == CompactionType.MINOR) {
70+
long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
71+
fileSizePredicate = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
72+
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
73+
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
74+
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
75+
// doesn't support vectorization, hence disabling it in this case.
76+
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
77+
}
6578

6679
if (partSpec == null) {
6780
if (!icebergTable.spec().isPartitioned()) {
6881
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
69-
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s", compactTableName, orderBy);
82+
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
83+
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
7084
} else if (icebergTable.specs().size() > 1) {
7185
// Compacting partitions of old partition specs on a partitioned table with partition evolution
7286
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
7387
// A single filter on a virtual column causes errors during compilation,
7488
// added another filter on file_path as a workaround.
7589
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
76-
"where %2$s != %3$d and %4$s is not null %5$s",
90+
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
7791
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
78-
VirtualColumn.FILE_PATH.getName(), orderBy);
92+
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
7993
} else {
8094
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
8195
// code branch is not supposed to be reachable
@@ -90,8 +104,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
90104
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
91105

92106
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s where %2$s=%3$d " +
93-
"and %4$s is not null %5$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
94-
VirtualColumn.FILE_PATH.getName(), orderBy);
107+
"and %4$s is not null %5$s %6$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
108+
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
95109
}
96110

97111
SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.mr.hive.compaction.evaluator;
20+
21+
import java.io.IOException;
22+
import java.io.UncheckedIOException;
23+
import java.util.Collections;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
27+
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
28+
import org.apache.iceberg.DataFile;
29+
import org.apache.iceberg.FileScanTask;
30+
import org.apache.iceberg.MetadataTableType;
31+
import org.apache.iceberg.MetadataTableUtils;
32+
import org.apache.iceberg.PartitionData;
33+
import org.apache.iceberg.PartitionSpec;
34+
import org.apache.iceberg.Partitioning;
35+
import org.apache.iceberg.PartitionsTable;
36+
import org.apache.iceberg.StructLike;
37+
import org.apache.iceberg.Table;
38+
import org.apache.iceberg.io.CloseableIterable;
39+
import org.apache.iceberg.mr.hive.IcebergTableUtil;
40+
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil;
41+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.CommonPartitionEvaluator;
42+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.IcebergTableFileScanHelper;
43+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.OptimizingConfig;
44+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableConfiguration;
45+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFileScanHelper;
46+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFormat;
47+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableProperties;
48+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntime;
49+
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntimeMeta;
50+
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
51+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
52+
import org.apache.iceberg.util.Pair;
53+
import org.apache.iceberg.util.StructProjection;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
57+
public class CompactionEvaluator extends CommonPartitionEvaluator {
58+
59+
private static final long LAST_OPTIMIZE_TIME = 0;
60+
private static final int TRIGGER_INTERVAL = -1;
61+
private final Table table;
62+
private final CompactionInfo ci;
63+
64+
private static final Logger LOG = LoggerFactory.getLogger(CompactionEvaluator.class);
65+
66+
public CompactionEvaluator(Table table, CompactionInfo ci, Map<String, String> parameters) throws IOException {
67+
super(
68+
createTableRuntime(table, parameters),
69+
getPartitionSpecStructPair(table, ci.partName),
70+
System.currentTimeMillis()
71+
);
72+
this.table = table;
73+
this.ci = ci;
74+
addFiles();
75+
}
76+
77+
public boolean isEligibleForCompaction() {
78+
79+
if (table.currentSnapshot() == null) {
80+
LOG.info("Table {}{} doesn't require compaction because it is empty", table,
81+
ci.partName == null ? "" : " partition " + ci.partName);
82+
return false;
83+
}
84+
85+
addFiles();
86+
87+
switch (ci.type) {
88+
case MINOR:
89+
return isMinorNecessary();
90+
case MAJOR:
91+
return isMajorNecessary();
92+
default:
93+
return false;
94+
}
95+
}
96+
97+
private static TableRuntime createTableRuntime(Table icebergTable, Map<String, String> parameters) {
98+
OptimizingConfig optimizingConfig = OptimizingConfig.parse(Collections.emptyMap());
99+
optimizingConfig.setTargetSize(getTargetSizeBytes(parameters));
100+
optimizingConfig.setFragmentRatio(getFragmentRatio(parameters));
101+
optimizingConfig.setMinTargetSizeRatio(getMinTargetSizeRatio(parameters));
102+
optimizingConfig.setMinorLeastFileCount(getMinInputFiles(parameters));
103+
optimizingConfig.setMajorDuplicateRatio(getDeleteFileRatio(parameters));
104+
optimizingConfig.setFullTriggerInterval(TRIGGER_INTERVAL);
105+
optimizingConfig.setMinorLeastInterval(TRIGGER_INTERVAL);
106+
107+
TableConfiguration tableConfig = new TableConfiguration();
108+
tableConfig.setOptimizingConfig(optimizingConfig);
109+
110+
TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
111+
tableRuntimeMeta.setTableName(icebergTable.name());
112+
tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
113+
tableRuntimeMeta.setLastFullOptimizingTime(LAST_OPTIMIZE_TIME);
114+
tableRuntimeMeta.setLastMinorOptimizingTime(LAST_OPTIMIZE_TIME);
115+
tableRuntimeMeta.setTableConfig(tableConfig);
116+
117+
return new HiveTableRuntime(tableRuntimeMeta);
118+
}
119+
120+
private void addFiles() {
121+
TableFileScanHelper tableFileScanHelper = new IcebergTableFileScanHelper(table,
122+
table.currentSnapshot().snapshotId());
123+
try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
124+
tableFileScanHelper.scan()) {
125+
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
126+
DataFile file = fileScanResult.file();
127+
if (IcebergCompactionUtil.shouldIncludeForCompaction(table, ci.partName, file)) {
128+
addFile(fileScanResult.file(), fileScanResult.deleteFiles());
129+
}
130+
}
131+
} catch (IOException e) {
132+
throw new UncheckedIOException(e);
133+
}
134+
}
135+
136+
public static long getTargetSizeBytes(Map<String, String> parameters) {
137+
return Optional.ofNullable(parameters.get(CompactorContext.COMPACTION_TARGET_SIZE))
138+
.map(Long::parseLong)
139+
.orElse(TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
140+
}
141+
142+
public static double getMinTargetSizeRatio(Map<String, String> parameters) {
143+
return Optional.ofNullable(parameters.get(CompactorContext.COMPACTION_MIN_TARGET_SIZE_RATIO))
144+
.map(Double::parseDouble)
145+
.orElse(TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT);
146+
}
147+
148+
public static int getFragmentRatio(Map<String, String> parameters) {
149+
return Optional.ofNullable(parameters.get(CompactorContext.COMPACTION_MIN_FRAGMENT_RATIO))
150+
.map(x -> (int) (1 / Double.parseDouble(x)))
151+
.orElse(TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT);
152+
}
153+
154+
public static int getFragmentSizeBytes(Map<String, String> parameters) {
155+
return (int) (getTargetSizeBytes(parameters) * getMinTargetSizeRatio(parameters));
156+
}
157+
158+
public static int getMinInputFiles(Map<String, String> parameters) {
159+
return Optional.ofNullable(parameters.get(CompactorContext.COMPACTION_MIN_INPUT_FILES))
160+
.map(Integer::parseInt)
161+
.orElse(TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT);
162+
}
163+
164+
public static double getDeleteFileRatio(Map<String, String> parameters) {
165+
return Optional.ofNullable(parameters.get(CompactorContext.COMPACTION_DELETE_FILE_RATIO))
166+
.map(Double::parseDouble)
167+
.orElse(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO_DEFAULT);
168+
}
169+
170+
private static Pair<Integer, StructLike> getPartitionSpecStructPair(Table table, String partitionPath)
171+
throws IOException {
172+
if (!table.spec().isPartitioned() || partitionPath == null) {
173+
return null;
174+
}
175+
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
176+
.createMetadataTableInstance(table, MetadataTableType.PARTITIONS);
177+
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
178+
return FluentIterable.from(fileScanTasks)
179+
.transformAndConcat(task -> task.asDataTask().rows())
180+
.transform(row -> {
181+
StructLike data = row.get(IcebergTableUtil.PART_IDX, StructProjection.class);
182+
PartitionSpec spec = table.specs().get(row.get(IcebergTableUtil.SPEC_IDX, Integer.class));
183+
PartitionData partitionData = IcebergTableUtil.toPartitionData(data,
184+
Partitioning.partitionType(table), spec.partitionType());
185+
String path = spec.partitionToPath(partitionData);
186+
return Maps.immutableEntry(path, Pair.of(spec.specId(), data));
187+
})
188+
.filter(e -> e.getKey().equals(partitionPath))
189+
.transform(Map.Entry::getValue)
190+
.get(0);
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)