Skip to content

Commit 8e41968

Browse files
committed
HIVE-28987: Iceberg: A faulty query predicate can compromise transaction isolation
1 parent de994e5 commit 8e41968

File tree

4 files changed

+56
-51
lines changed

4 files changed

+56
-51
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private InputFormatConfig() {
8787
public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
8888
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
8989
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
90-
public static final String QUERY_FILTERS = "iceberg.query.filters";
90+
public static final String QUERY_FILTERS = "iceberg.query.filters.";
9191

9292
public enum InMemoryDataModel {
9393
PIG,

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -452,20 +452,19 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
452452
String branchName = null;
453453

454454
Long snapshotId = null;
455-
Expression filterExpr = Expressions.alwaysTrue();
455+
Expression filterExpr = null;
456456

457457
for (JobContext jobContext : jobContexts) {
458458
JobConf conf = jobContext.getJobConf();
459+
459460
table = Optional.ofNullable(table).orElseGet(() -> Catalogs.loadTable(conf, catalogProperties));
460461
branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
461462
snapshotId = getSnapshotId(outputTable.table, branchName);
462463

463-
Expression jobContextFilterExpr = (Expression) SessionStateUtil.getResource(conf, InputFormatConfig.QUERY_FILTERS)
464-
.orElse(Expressions.alwaysTrue());
465-
if (!filterExpr.equals(jobContextFilterExpr)) {
466-
filterExpr = Expressions.and(filterExpr, jobContextFilterExpr);
467-
}
468-
LOG.debug("Filter Expression :{}", filterExpr);
464+
filterExpr = (Expression) SessionStateUtil.getResource(conf,
465+
InputFormatConfig.QUERY_FILTERS + catalogProperties.get(Catalogs.NAME))
466+
.orElse(null);
467+
469468
LOG.info("Committing job has started for table: {}, using location: {}",
470469
table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));
471470

@@ -485,14 +484,15 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
485484
deleteFiles.addAll(writeResults.deleteFiles());
486485
replacedDataFiles.addAll(writeResults.replacedDataFiles());
487486
referencedDataFiles.addAll(writeResults.referencedDataFiles());
487+
488488
mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles());
489489
}
490490

491491
dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(dataFile.path()))));
492492
deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(deleteFile.path()))));
493493

494494
FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles,
495-
Collections.emptySet());
495+
Collections.emptySet());
496496
long startTime = System.currentTimeMillis();
497497

498498
if (Operation.IOW != operation) {
@@ -505,7 +505,6 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
505505
commitWrite(table, branchName, snapshotId, startTime, filesForCommit, operation, filterExpr);
506506
}
507507
} else {
508-
509508
RewritePolicy rewritePolicy = RewritePolicy.fromString(jobContexts.stream()
510509
.findAny()
511510
.map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
@@ -558,7 +557,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
558557
if (snapshotId != null) {
559558
write.validateFromSnapshot(snapshotId);
560559
}
561-
write.conflictDetectionFilter(filterExpr);
560+
if (filterExpr != null) {
561+
LOG.debug("Conflict detection Filter Expression :{}", filterExpr);
562+
write.conflictDetectionFilter(filterExpr);
563+
}
562564
write.validateNoConflictingData();
563565
write.validateNoConflictingDeletes();
564566
commit(write);
@@ -584,8 +586,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
584586
if (snapshotId != null) {
585587
write.validateFromSnapshot(snapshotId);
586588
}
587-
write.conflictDetectionFilter(filterExpr);
588-
589+
if (filterExpr != null) {
590+
LOG.debug("Conflict detection Filter Expression :{}", filterExpr);
591+
write.conflictDetectionFilter(filterExpr);
592+
}
589593
if (!results.dataFiles().isEmpty()) {
590594
write.validateDeletedFiles();
591595
write.validateNoConflictingDeleteFiles();

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,17 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
419419
}
420420
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
421421
Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
422+
422423
if (filterExpr != null) {
423-
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr);
424+
String filterKey = InputFormatConfig.QUERY_FILTERS + jobConf.get(Catalogs.NAME);
425+
Expression prevFilterExpr = (Expression) SessionStateUtil.getResource(conf, filterKey)
426+
.orElse(null);
427+
if (prevFilterExpr == null) {
428+
SessionStateUtil.addResource(conf, filterKey, filterExpr);
429+
} else if (!prevFilterExpr.isEquivalentTo(filterExpr)) {
430+
// disable the conflict detection Filter in case of multiple TableScan operators
431+
SessionStateUtil.addResource(conf, filterKey, Expressions.alwaysTrue());
432+
}
424433
}
425434
return predicate;
426435
}

ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,20 @@
1818

1919
package org.apache.hadoop.hive.ql.session;
2020

21-
import java.util.HashMap;
2221
import java.util.Map;
2322
import java.util.Optional;
23+
24+
import com.google.common.collect.Maps;
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.hive.conf.HiveConf;
2627
import org.apache.hadoop.hive.ql.QueryState;
27-
import org.slf4j.Logger;
28-
import org.slf4j.LoggerFactory;
2928

3029
public class SessionStateUtil {
3130

32-
private static final Logger LOG = LoggerFactory.getLogger(SessionStateUtil.class);
3331
private static final String COMMIT_INFO_PREFIX = "COMMIT_INFO.";
3432
public static final String DEFAULT_TABLE_LOCATION = "defaultLocation";
3533

3634
private SessionStateUtil() {
37-
3835
}
3936

4037
/**
@@ -44,7 +41,8 @@ private SessionStateUtil() {
4441
* could not be found
4542
*/
4643
public static Optional<Object> getResource(Configuration conf, String key) {
47-
return getQueryState(conf).map(state -> state.getResource(key));
44+
return getQueryState(conf)
45+
.map(queryState -> queryState.getResource(key));
4846
}
4947

5048
/**
@@ -54,29 +52,24 @@ public static Optional<Object> getResource(Configuration conf, String key) {
5452
* resource itself could not be found, or the resource is not of type String
5553
*/
5654
public static Optional<String> getProperty(Configuration conf, String key) {
57-
return getResource(conf, key).filter(o -> o instanceof String).map(o -> (String) o);
55+
return getResource(conf, key).filter(obj -> obj instanceof String)
56+
.map(obj -> (String) obj);
5857
}
5958

6059
/**
6160
* @param conf Configuration object used for getting the query state, should contain the query id
6261
* @param key The resource identifier
6362
* @param resource The resource to save into the QueryState
64-
* @return whether operation succeeded
6563
*/
66-
public static boolean addResource(Configuration conf, String key, Object resource) {
67-
Optional<QueryState> queryState = getQueryState(conf);
68-
if (queryState.isPresent()) {
69-
queryState.get().addResource(key, resource);
70-
return true;
71-
} else {
72-
return false;
73-
}
64+
public static void addResource(Configuration conf, String key, Object resource) {
65+
getQueryState(conf)
66+
.ifPresent(queryState -> queryState.addResource(key, resource));
7467
}
7568

7669
public static void addResourceOrThrow(Configuration conf, String key, Object resource) {
7770
getQueryState(conf)
78-
.orElseThrow(() -> new IllegalStateException("Query state is missing; failed to add resource for " + key))
79-
.addResource(key, resource);
71+
.orElseThrow(() -> new IllegalStateException("Query state is missing; failed to add resource for " + key))
72+
.addResource(key, resource);
8073
}
8174

8275
/**
@@ -85,7 +78,8 @@ public static void addResourceOrThrow(Configuration conf, String key, Object res
8578
* @return the CommitInfo map. Key: jobId, Value: {@link CommitInfo}, or empty Optional if not present
8679
*/
8780
public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf, String tableName) {
88-
return getResource(conf, COMMIT_INFO_PREFIX + tableName).map(o -> (Map<String, CommitInfo>)o);
81+
return getResource(conf, COMMIT_INFO_PREFIX + tableName)
82+
.map(obj -> (Map<String, CommitInfo>) obj);
8983
}
9084

9185
/**
@@ -94,30 +88,28 @@ public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf
9488
* @param jobId The job ID
9589
* @param taskNum The number of successful tasks for the job
9690
* @param additionalProps Any additional properties related to the job commit
97-
* @return whether the operation succeeded
9891
*/
99-
public static boolean addCommitInfo(Configuration conf, String tableName, String jobId, int taskNum,
100-
Map<String, String> additionalProps) {
92+
public static void addCommitInfo(
93+
Configuration conf, String tableName, String jobId, int taskNum, Map<String, String> additionalProps) {
10194

10295
CommitInfo commitInfo = new CommitInfo()
103-
.withJobID(jobId)
104-
.withTaskNum(taskNum)
105-
.withProps(additionalProps);
106-
107-
Optional<Map<String, CommitInfo>> commitInfoMap = getCommitInfo(conf, tableName);
108-
if (commitInfoMap.isPresent()) {
109-
commitInfoMap.get().put(jobId, commitInfo);
110-
return true;
111-
}
112-
113-
Map<String, CommitInfo> newCommitInfoMap = new HashMap<>();
114-
newCommitInfoMap.put(jobId, commitInfo);
115-
116-
return addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
96+
.withJobID(jobId)
97+
.withTaskNum(taskNum)
98+
.withProps(additionalProps);
99+
100+
getCommitInfo(conf, tableName)
101+
.ifPresentOrElse(commitInfoMap -> commitInfoMap.put(jobId, commitInfo),
102+
() -> {
103+
Map<String, CommitInfo> newCommitInfoMap = Maps.newHashMap();
104+
newCommitInfoMap.put(jobId, commitInfo);
105+
106+
addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
107+
});
117108
}
118109

119110
public static Optional<QueryState> getQueryState(Configuration conf) {
120-
return Optional.ofNullable(SessionState.get()).map(ss -> ss.getQueryState(HiveConf.getQueryId(conf)));
111+
return Optional.ofNullable(SessionState.get())
112+
.map(ss -> ss.getQueryState(HiveConf.getQueryId(conf)));
121113
}
122114

123115
/**

0 commit comments

Comments
 (0)