Skip to content

Commit 235581d

Browse files
committed
HIVE-28818: Snapshot-level Partition-Aware Optimization
1 parent cf6c961 commit 235581d

File tree

7 files changed

+91
-38
lines changed

7 files changed

+91
-38
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.DataOutput;
2525
import java.io.IOException;
2626
import java.util.Collection;
27-
import java.util.OptionalInt;
2827
import java.util.stream.IntStream;
2928
import org.apache.hadoop.fs.Path;
3029
import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
@@ -98,17 +97,17 @@ public byte[] getBytesForHash() {
9897
}
9998

10099
@Override
101-
public OptionalInt getBucketId() {
100+
public int getBucketId() {
102101
final StructLike key = innerSplit.taskGroup().groupingKey();
103102
if (key.size() == 0) {
104-
return OptionalInt.empty();
103+
throw new IllegalStateException("The grouping key is empty though a bucket id is requested");
105104
}
106105
final int[] bucketIds = IntStream
107106
.range(0, key.size())
108107
.map(i -> key.get(i, Integer.class))
109108
.toArray();
110109
final int hashCode = IcebergBucketFunction.getHashCode(bucketIds);
111-
return OptionalInt.of(ObjectInspectorUtils.getBucketNumber(hashCode, numBuckets));
110+
return ObjectInspectorUtils.getBucketNumber(hashCode, numBuckets);
112111
}
113112

114113
@Override

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -968,25 +968,42 @@ private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.
968968

969969
@Override
970970
public boolean supportsPartitionAwareOptimization(org.apache.hadoop.hive.ql.metadata.Table table) {
971-
if (hasUndergonePartitionEvolution(table)) {
972-
// Don't support complex cases yet
971+
final Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
972+
final Snapshot snapshot = IcebergTableUtil.getTableSnapshot(icebergTable, table);
973+
if (snapshot == null) {
974+
LOG.info("Partition-Aware Optimization is not supported because an unknown snapshot is specified");
975+
return false;
976+
}
977+
978+
final Set<Integer> partitionSpecIds = IcebergTableUtil.getPartitionSpecIds(snapshot, icebergTable.io());
979+
if (partitionSpecIds.size() != 1) {
980+
LOG.info("Partition-Aware Optimization is not supported when multiple partition specs are combined: {}",
981+
partitionSpecIds);
973982
return false;
974983
}
975-
final List<TransformSpec> specs = getPartitionTransformSpec(table);
984+
final int partitionSpecId = partitionSpecIds.iterator().next();
985+
final List<TransformSpec> specs = IcebergTableUtil.getTransformSpecs(icebergTable, partitionSpecId);
976986
// Currently, we support the only bucket transform
977987
return specs.stream().anyMatch(IcebergTableUtil::isBucket);
978988
}
979989

980990
@Override
981991
public PartitionAwareOptimizationCtx createPartitionAwareOptimizationContext(
982992
org.apache.hadoop.hive.ql.metadata.Table table) {
993+
final Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
994+
final Snapshot snapshot = Objects.requireNonNull(IcebergTableUtil.getTableSnapshot(icebergTable, table));
995+
final Set<Integer> partitionSpecIds = IcebergTableUtil.getPartitionSpecIds(snapshot, icebergTable.io());
996+
Preconditions.checkArgument(partitionSpecIds.size() == 1);
997+
final int partitionSpecId = partitionSpecIds.iterator().next();
998+
983999
// Currently, we support the only bucket transform
9841000
final List<String> bucketColumnNames = Lists.newArrayList();
9851001
final List<Integer> numBuckets = Lists.newArrayList();
986-
getPartitionTransformSpec(table).stream().filter(IcebergTableUtil::isBucket).forEach(spec -> {
987-
bucketColumnNames.add(spec.getColumnName());
988-
numBuckets.add(spec.getTransformParam().get());
989-
});
1002+
IcebergTableUtil.getTransformSpecs(icebergTable, partitionSpecId).stream().filter(IcebergTableUtil::isBucket)
1003+
.forEach(spec -> {
1004+
bucketColumnNames.add(spec.getColumnName());
1005+
numBuckets.add(spec.getTransformParam().get());
1006+
});
9901007

9911008
if (bucketColumnNames.isEmpty()) {
9921009
return null;

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import java.util.Map;
2828
import java.util.Optional;
2929
import java.util.Properties;
30+
import java.util.Set;
3031
import java.util.function.BinaryOperator;
3132
import java.util.function.Function;
3233
import java.util.stream.Collectors;
34+
import java.util.stream.StreamSupport;
3335
import org.apache.commons.lang3.StringUtils;
3436
import org.apache.hadoop.conf.Configuration;
3537
import org.apache.hadoop.fs.Path;
@@ -53,9 +55,13 @@
5355
import org.apache.hadoop.hive.ql.plan.PlanUtils;
5456
import org.apache.hadoop.hive.ql.session.SessionState;
5557
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
58+
import org.apache.iceberg.DataFile;
5659
import org.apache.iceberg.DeleteFiles;
5760
import org.apache.iceberg.FileScanTask;
5861
import org.apache.iceberg.ManageSnapshots;
62+
import org.apache.iceberg.ManifestFile;
63+
import org.apache.iceberg.ManifestFiles;
64+
import org.apache.iceberg.ManifestReader;
5965
import org.apache.iceberg.MetadataTableType;
6066
import org.apache.iceberg.MetadataTableUtils;
6167
import org.apache.iceberg.PartitionData;
@@ -78,6 +84,7 @@
7884
import org.apache.iceberg.expressions.ResidualEvaluator;
7985
import org.apache.iceberg.hive.HiveSchemaUtil;
8086
import org.apache.iceberg.io.CloseableIterable;
87+
import org.apache.iceberg.io.FileIO;
8188
import org.apache.iceberg.mr.Catalogs;
8289
import org.apache.iceberg.mr.InputFormatConfig;
8390
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
@@ -546,9 +553,24 @@ public static PartitionSpec getPartitionSpec(Table icebergTable, String partitio
546553
}
547554

548555
public static TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
549-
TransformSpec spec = TransformSpec.fromString(transformName.toUpperCase(),
550-
table.schema().findColumnName(sourceId));
551-
return spec;
556+
return TransformSpec.fromString(transformName.toUpperCase(), table.schema().findColumnName(sourceId));
552557
}
553558

559+
public static List<TransformSpec> getTransformSpecs(Table table, int partitionSpecId) {
560+
final PartitionSpec icebergSpec = table.specs().get(partitionSpecId);
561+
return icebergSpec.fields().stream()
562+
.map(f -> getTransformSpec(table, f.transform().toString(), f.sourceId()))
563+
.collect(Collectors.toList());
564+
}
565+
566+
public static Set<Integer> getPartitionSpecIds(Snapshot snapshot, FileIO io) {
567+
final List<ManifestFile> manifestFiles = snapshot.allManifests(io);
568+
return manifestFiles.parallelStream().flatMap(manifestFile -> {
569+
try (ManifestReader<DataFile> entries = ManifestFiles.read(manifestFile, io)) {
570+
return StreamSupport.stream(entries.spliterator(), false).map(DataFile::specId);
571+
} catch (IOException e) {
572+
throw new RuntimeException("Failed to read manifest file: " + manifestFile.path(), e);
573+
}
574+
}).collect(Collectors.toSet());
575+
}
554576
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import java.io.IOException;
2323
import java.io.UncheckedIOException;
24+
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.Optional;
27+
import java.util.Set;
2628
import java.util.concurrent.ExecutorService;
2729
import java.util.function.Consumer;
2830
import org.apache.commons.lang3.StringUtils;
@@ -45,10 +47,12 @@
4547
import org.apache.iceberg.DataTableScan;
4648
import org.apache.iceberg.FileScanTask;
4749
import org.apache.iceberg.IncrementalAppendScan;
50+
import org.apache.iceberg.PartitionSpec;
4851
import org.apache.iceberg.Partitioning;
4952
import org.apache.iceberg.Scan;
5053
import org.apache.iceberg.ScanTaskGroup;
5154
import org.apache.iceberg.Schema;
55+
import org.apache.iceberg.Snapshot;
5256
import org.apache.iceberg.SnapshotRef;
5357
import org.apache.iceberg.SystemConfigs;
5458
import org.apache.iceberg.Table;
@@ -60,6 +64,8 @@
6064
import org.apache.iceberg.mr.Catalogs;
6165
import org.apache.iceberg.mr.InputFormatConfig;
6266
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
67+
import org.apache.iceberg.mr.hive.IcebergTableUtil;
68+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
6369
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
6470
import org.apache.iceberg.types.Types.StructType;
6571
import org.apache.iceberg.util.SerializationUtil;
@@ -197,11 +203,15 @@ private List<InputSplit> planInputSplits(Table table, Configuration conf, Execut
197203
InputFormatConfig.InMemoryDataModel.GENERIC);
198204

199205
long fromVersion = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
206+
Snapshot snapshot;
200207
Scan<? extends Scan, FileScanTask, CombinedScanTask> scan;
201208
if (fromVersion != -1) {
209+
snapshot = table.currentSnapshot();
202210
scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
203211
} else {
204-
scan = applyConfig(conf, createTableScan(table, conf));
212+
TableScan tableScan = createTableScan(table, conf);
213+
snapshot = tableScan.snapshot();
214+
scan = applyConfig(conf, tableScan);
205215
}
206216
scan = scan.planWith(workerPool);
207217

@@ -211,7 +221,7 @@ private List<InputSplit> planInputSplits(Table table, Configuration conf, Execut
211221
Path tableLocation = new Path(conf.get(InputFormatConfig.TABLE_LOCATION));
212222

213223
String[] groupingPartitionColumns = conf.getStrings(InputFormatConfig.GROUPING_PARTITION_COLUMNS);
214-
generateInputSplits(scan, table, groupingPartitionColumns, taskGroup -> {
224+
generateInputSplits(scan, table, snapshot, groupingPartitionColumns, taskGroup -> {
215225
if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE ||
216226
model == InputFormatConfig.InMemoryDataModel.PIG)) {
217227
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
@@ -241,7 +251,7 @@ private static void validateFileLocations(ScanTaskGroup<FileScanTask> split, Pat
241251
}
242252
}
243253

244-
private static void generateInputSplits(Scan<?, FileScanTask, CombinedScanTask> scan, Table table,
254+
private static void generateInputSplits(Scan<?, FileScanTask, CombinedScanTask> scan, Table table, Snapshot snapshot,
245255
String[] groupingPartitionColumns, Consumer<ScanTaskGroup<FileScanTask>> consumer) {
246256
if (groupingPartitionColumns == null) {
247257
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
@@ -250,8 +260,11 @@ private static void generateInputSplits(Scan<?, FileScanTask, CombinedScanTask>
250260
throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
251261
}
252262
} else {
253-
final StructType groupingKeyType = Partitioning.groupingKeyType(
254-
table.schema().select(groupingPartitionColumns), table.specs().values());
263+
final Schema schema = table.schemas().get(snapshot.schemaId()).select(groupingPartitionColumns);
264+
final Set<Integer> specIds = IcebergTableUtil.getPartitionSpecIds(snapshot, table.io());
265+
Preconditions.checkArgument(specIds.size() == 1);
266+
final PartitionSpec partitionSpec = table.specs().get(specIds.iterator().next());
267+
final StructType groupingKeyType = Partitioning.groupingKeyType(schema, Collections.singletonList(partitionSpec));
255268
try (CloseableIterable<FileScanTask> taskIterable = scan.planFiles()) {
256269
final List<FileScanTask> tasks = Lists.newArrayList(taskIterable);
257270
final List<ScanTaskGroup<FileScanTask>> partitionScanTaskGroups =

iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ POSTHOOK: Input: default@srcbucket_big
291291
Plan optimized by CBO.
292292

293293
Vertex dependency in root stage
294-
Map 1 <- Map 3 (BROADCAST_EDGE)
294+
Map 1 <- Map 3 (CUSTOM_EDGE)
295295
Reducer 2 <- Map 1 (SIMPLE_EDGE)
296296

297297
Stage-0
@@ -305,9 +305,9 @@ Stage-0
305305
<-Map 1 [SIMPLE_EDGE] vectorized, llap
306306
SHUFFLE [RS_35]
307307
Map Join Operator [MAPJOIN_34] (rows=6 width=271)
308-
Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
309-
<-Map 3 [BROADCAST_EDGE] vectorized, llap
310-
BROADCAST [RS_31]
308+
BucketMapJoin:true,Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
309+
<-Map 3 [CUSTOM_EDGE] vectorized, llap
310+
MULTICAST [RS_31]
311311
PartitionCols:_col0
312312
Select Operator [SEL_30] (rows=4 width=93)
313313
Output:["_col0","_col1"]
@@ -320,7 +320,7 @@ Stage-0
320320
Filter Operator [FIL_32] (rows=6 width=178)
321321
predicate:key is not null
322322
TableScan [TS_0] (rows=6 width=178)
323-
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Output:["id","key","value"]
323+
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:4,Grouping Partition Columns:["key"],Output:["id","key","value"]
324324

325325
PREHOOK: query: SELECT *
326326
FROM default.srcbucket_big.tag_bucket_4 a
@@ -550,7 +550,7 @@ POSTHOOK: Input: default@srcbucket_big
550550
Plan optimized by CBO.
551551

552552
Vertex dependency in root stage
553-
Map 1 <- Map 3 (BROADCAST_EDGE)
553+
Map 1 <- Map 3 (CUSTOM_EDGE)
554554
Reducer 2 <- Map 1 (SIMPLE_EDGE)
555555

556556
Stage-0
@@ -564,9 +564,9 @@ Stage-0
564564
<-Map 1 [SIMPLE_EDGE] vectorized, llap
565565
SHUFFLE [RS_35]
566566
Map Join Operator [MAPJOIN_34] (rows=8 width=250)
567-
Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
568-
<-Map 3 [BROADCAST_EDGE] vectorized, llap
569-
BROADCAST [RS_31]
567+
BucketMapJoin:true,Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
568+
<-Map 3 [CUSTOM_EDGE] vectorized, llap
569+
MULTICAST [RS_31]
570570
PartitionCols:_col0
571571
Select Operator [SEL_30] (rows=4 width=93)
572572
Output:["_col0","_col1"]
@@ -579,7 +579,7 @@ Stage-0
579579
Filter Operator [FIL_32] (rows=8 width=157)
580580
predicate:key is not null
581581
TableScan [TS_0] (rows=8 width=157)
582-
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Output:["id","key","value"]
582+
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:8,Grouping Partition Columns:["key"],Output:["id","key","value"]
583583

584584
PREHOOK: query: SELECT *
585585
FROM default.srcbucket_big a
@@ -626,7 +626,7 @@ POSTHOOK: Input: default@srcbucket_big
626626
Plan optimized by CBO.
627627

628628
Vertex dependency in root stage
629-
Map 1 <- Map 3 (BROADCAST_EDGE)
629+
Map 1 <- Map 3 (CUSTOM_EDGE)
630630
Reducer 2 <- Map 1 (SIMPLE_EDGE)
631631

632632
Stage-0
@@ -640,9 +640,9 @@ Stage-0
640640
<-Map 1 [SIMPLE_EDGE] vectorized, llap
641641
SHUFFLE [RS_35]
642642
Map Join Operator [MAPJOIN_34] (rows=6 width=271)
643-
Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
644-
<-Map 3 [BROADCAST_EDGE] vectorized, llap
645-
BROADCAST [RS_31]
643+
BucketMapJoin:true,Conds:SEL_33._col1=RS_31._col0(Inner),Output:["_col0","_col1","_col2","_col3","_col4"]
644+
<-Map 3 [CUSTOM_EDGE] vectorized, llap
645+
MULTICAST [RS_31]
646646
PartitionCols:_col0
647647
Select Operator [SEL_30] (rows=4 width=93)
648648
Output:["_col0","_col1"]
@@ -655,7 +655,7 @@ Stage-0
655655
Filter Operator [FIL_32] (rows=6 width=178)
656656
predicate:key is not null
657657
TableScan [TS_0] (rows=6 width=178)
658-
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Output:["id","key","value"]
658+
default@srcbucket_big,a,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:4,Grouping Partition Columns:["key"],Output:["id","key","value"]
659659

660660
PREHOOK: query: SELECT *
661661
FROM default.srcbucket_big.tag_bucket_4 a

ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.hadoop.mapred.RecordReader;
7272
import org.apache.hadoop.mapred.Reporter;
7373
import org.apache.hadoop.mapred.TextInputFormat;
74+
import org.apache.hadoop.util.Preconditions;
7475
import org.apache.hadoop.util.StringUtils;
7576
import org.apache.hive.common.util.HiveStringUtils;
7677
import org.apache.hive.common.util.Ref;
@@ -174,7 +175,9 @@ public String inputFormatClassName() {
174175

175176
public OptionalInt getBucketId() {
176177
if (inputSplit instanceof PartitionAwareSplit) {
177-
return ((PartitionAwareSplit) inputSplit).getBucketId();
178+
final int bucketId = ((PartitionAwareSplit) inputSplit).getBucketId();
179+
Preconditions.checkArgument(bucketId >= 0);
180+
return OptionalInt.of(bucketId);
178181
}
179182

180183
final int bucketId = Utilities.parseSplitBucket(inputSplit);

ql/src/java/org/apache/hadoop/hive/ql/io/PartitionAwareSplit.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hadoop.hive.ql.io;
2020

21-
import java.util.OptionalInt;
2221
import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
2322

2423
/**
@@ -27,8 +26,8 @@
2726
@Unstable
2827
public interface PartitionAwareSplit {
2928
/**
30-
* Returns the bucket number of this split. OptionalInt.empty if this is not a bucketed split.
29+
* Returns the bucket number of this split
3130
*/
3231
@Unstable
33-
OptionalInt getBucketId();
32+
int getBucketId();
3433
}

0 commit comments

Comments
 (0)