Skip to content

HIVE-28987: Iceberg: A faulty query predicate can compromise transaction isolation #5839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private InputFormatConfig() {
public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public static final String QUERY_FILTERS = "iceberg.query.filters";
public static final String QUERY_FILTERS = "iceberg.query.filters.";

public enum InMemoryDataModel {
PIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,20 +452,19 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
String branchName = null;

Long snapshotId = null;
Expression filterExpr = Expressions.alwaysTrue();
Expression filterExpr = null;

for (JobContext jobContext : jobContexts) {
JobConf conf = jobContext.getJobConf();

table = Optional.ofNullable(table).orElseGet(() -> Catalogs.loadTable(conf, catalogProperties));
branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
snapshotId = getSnapshotId(outputTable.table, branchName);

Expression jobContextFilterExpr = (Expression) SessionStateUtil.getResource(conf, InputFormatConfig.QUERY_FILTERS)
.orElse(Expressions.alwaysTrue());
if (!filterExpr.equals(jobContextFilterExpr)) {
filterExpr = Expressions.and(filterExpr, jobContextFilterExpr);
}
LOG.debug("Filter Expression :{}", filterExpr);
filterExpr = (Expression) SessionStateUtil.getResource(conf,
InputFormatConfig.QUERY_FILTERS + catalogProperties.get(Catalogs.NAME))
.orElse(null);

LOG.info("Committing job has started for table: {}, using location: {}",
table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));

Expand All @@ -485,14 +484,15 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
deleteFiles.addAll(writeResults.deleteFiles());
replacedDataFiles.addAll(writeResults.replacedDataFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());

mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles());
}

dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(dataFile.path()))));
deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(deleteFile.path()))));

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles,
Collections.emptySet());
Collections.emptySet());
long startTime = System.currentTimeMillis();

if (Operation.IOW != operation) {
Expand All @@ -505,7 +505,6 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
commitWrite(table, branchName, snapshotId, startTime, filesForCommit, operation, filterExpr);
}
} else {

RewritePolicy rewritePolicy = RewritePolicy.fromString(jobContexts.stream()
.findAny()
.map(x -> x.getJobConf().get(ConfVars.REWRITE_POLICY.varname))
Expand Down Expand Up @@ -558,7 +557,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
if (snapshotId != null) {
write.validateFromSnapshot(snapshotId);
}
write.conflictDetectionFilter(filterExpr);
if (filterExpr != null) {
LOG.debug("Conflict detection Filter Expression :{}", filterExpr);
write.conflictDetectionFilter(filterExpr);
}
write.validateNoConflictingData();
write.validateNoConflictingDeletes();
commit(write);
Expand All @@ -584,8 +586,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
if (snapshotId != null) {
write.validateFromSnapshot(snapshotId);
}
write.conflictDetectionFilter(filterExpr);

if (filterExpr != null) {
LOG.debug("Conflict detection Filter Expression :{}", filterExpr);
write.conflictDetectionFilter(filterExpr);
}
if (!results.dataFiles().isEmpty()) {
write.validateDeletedFiles();
write.validateNoConflictingDeleteFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,17 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);

if (filterExpr != null) {
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr);
String filterKey = InputFormatConfig.QUERY_FILTERS + jobConf.get(Catalogs.NAME);
Expression prevFilterExpr = (Expression) SessionStateUtil.getResource(conf, filterKey)
.orElse(null);
if (prevFilterExpr == null) {
SessionStateUtil.addResource(conf, filterKey, filterExpr);
} else if (!prevFilterExpr.isEquivalentTo(filterExpr)) {
// disable the conflict detection Filter in case of multiple TableScan operators
SessionStateUtil.addResource(conf, filterKey, Expressions.alwaysTrue());
}
}
return predicate;
}
Expand Down
66 changes: 29 additions & 37 deletions ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@

package org.apache.hadoop.hive.ql.session;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionStateUtil {

private static final Logger LOG = LoggerFactory.getLogger(SessionStateUtil.class);
private static final String COMMIT_INFO_PREFIX = "COMMIT_INFO.";
public static final String DEFAULT_TABLE_LOCATION = "defaultLocation";

private SessionStateUtil() {

}

/**
Expand All @@ -44,7 +41,8 @@ private SessionStateUtil() {
* could not be found
*/
public static Optional<Object> getResource(Configuration conf, String key) {
return getQueryState(conf).map(state -> state.getResource(key));
return getQueryState(conf)
.map(queryState -> queryState.getResource(key));
}

/**
Expand All @@ -54,29 +52,24 @@ public static Optional<Object> getResource(Configuration conf, String key) {
* resource itself could not be found, or the resource is not of type String
*/
public static Optional<String> getProperty(Configuration conf, String key) {
return getResource(conf, key).filter(o -> o instanceof String).map(o -> (String) o);
return getResource(conf, key).filter(obj -> obj instanceof String)
.map(obj -> (String) obj);
}

/**
* @param conf Configuration object used for getting the query state, should contain the query id
* @param key The resource identifier
* @param resource The resource to save into the QueryState
* @return whether operation succeeded
*/
public static boolean addResource(Configuration conf, String key, Object resource) {
Optional<QueryState> queryState = getQueryState(conf);
if (queryState.isPresent()) {
queryState.get().addResource(key, resource);
return true;
} else {
return false;
}
public static void addResource(Configuration conf, String key, Object resource) {
getQueryState(conf)
.ifPresent(queryState -> queryState.addResource(key, resource));
}

public static void addResourceOrThrow(Configuration conf, String key, Object resource) {
getQueryState(conf)
.orElseThrow(() -> new IllegalStateException("Query state is missing; failed to add resource for " + key))
.addResource(key, resource);
.orElseThrow(() -> new IllegalStateException("Query state is missing; failed to add resource for " + key))
.addResource(key, resource);
}

/**
Expand All @@ -85,7 +78,8 @@ public static void addResourceOrThrow(Configuration conf, String key, Object res
* @return the CommitInfo map. Key: jobId, Value: {@link CommitInfo}, or empty Optional if not present
*/
public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf, String tableName) {
return getResource(conf, COMMIT_INFO_PREFIX + tableName).map(o -> (Map<String, CommitInfo>)o);
return getResource(conf, COMMIT_INFO_PREFIX + tableName)
.map(obj -> (Map<String, CommitInfo>) obj);
}

/**
Expand All @@ -94,30 +88,28 @@ public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf
* @param jobId The job ID
* @param taskNum The number of successful tasks for the job
* @param additionalProps Any additional properties related to the job commit
* @return whether the operation succeeded
*/
public static boolean addCommitInfo(Configuration conf, String tableName, String jobId, int taskNum,
Map<String, String> additionalProps) {
public static void addCommitInfo(
Configuration conf, String tableName, String jobId, int taskNum, Map<String, String> additionalProps) {

CommitInfo commitInfo = new CommitInfo()
.withJobID(jobId)
.withTaskNum(taskNum)
.withProps(additionalProps);

Optional<Map<String, CommitInfo>> commitInfoMap = getCommitInfo(conf, tableName);
if (commitInfoMap.isPresent()) {
commitInfoMap.get().put(jobId, commitInfo);
return true;
}

Map<String, CommitInfo> newCommitInfoMap = new HashMap<>();
newCommitInfoMap.put(jobId, commitInfo);

return addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
.withJobID(jobId)
.withTaskNum(taskNum)
.withProps(additionalProps);

getCommitInfo(conf, tableName)
.ifPresentOrElse(commitInfoMap -> commitInfoMap.put(jobId, commitInfo),
() -> {
Map<String, CommitInfo> newCommitInfoMap = Maps.newHashMap();
newCommitInfoMap.put(jobId, commitInfo);

addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
});
}

public static Optional<QueryState> getQueryState(Configuration conf) {
return Optional.ofNullable(SessionState.get()).map(ss -> ss.getQueryState(HiveConf.getQueryId(conf)));
return Optional.ofNullable(SessionState.get())
.map(ss -> ss.getQueryState(HiveConf.getQueryId(conf)));
}

/**
Expand Down
Loading