Skip to content

Commit

Permalink
Bypass file path check for sync partition
Browse files Browse the repository at this point in the history
  • Loading branch information
fgwang7w authored and yingsu00 committed Jan 7, 2023
1 parent 13109ed commit bb88317
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HdfsContext
private final Optional<String> tablePath;
// true if the table already exist in the metastore, false if the table is about to be created in the current transaction
private final Optional<Boolean> isNewTable;
private final Optional<Boolean> isPathValidationNeeded;
private final Optional<String> clientInfo;
private final Optional<Set<String>> clientTags;
private final Optional<ConnectorSession> session;
Expand All @@ -53,6 +54,7 @@ public HdfsContext(ConnectorIdentity identity)
this.tablePath = Optional.empty();
this.isNewTable = Optional.empty();
this.session = Optional.empty();
this.isPathValidationNeeded = Optional.empty();
}

/**
Expand Down Expand Up @@ -92,6 +94,22 @@ public HdfsContext(ConnectorSession session, String schemaName, String tableName
Optional.empty());
}

public HdfsContext(
ConnectorSession session,
String schemaName,
String tableName,
String tablePath,
boolean isNewTable,
boolean isPathValidationNeeded)
{
this(
session,
Optional.of(requireNonNull(schemaName, "schemaName is null")),
Optional.of(requireNonNull(tableName, "tableName is null")),
Optional.of(requireNonNull(tablePath, "tablePath is null")),
Optional.of(isNewTable),
Optional.of(isPathValidationNeeded));
}
public HdfsContext(
ConnectorSession session,
String schemaName,
Expand All @@ -104,7 +122,8 @@ public HdfsContext(
Optional.of(requireNonNull(schemaName, "schemaName is null")),
Optional.of(requireNonNull(tableName, "tableName is null")),
Optional.of(requireNonNull(tablePath, "tablePath is null")),
Optional.of(isNewTable));
Optional.of(isNewTable),
Optional.empty());
}

private HdfsContext(
Expand All @@ -113,6 +132,22 @@ private HdfsContext(
Optional<String> tableName,
Optional<String> tablePath,
Optional<Boolean> isNewTable)
{
this(
session,
schemaName,
tableName,
tablePath,
isNewTable,
Optional.empty());
}
private HdfsContext(
ConnectorSession session,
Optional<String> schemaName,
Optional<String> tableName,
Optional<String> tablePath,
Optional<Boolean> isNewTable,
Optional<Boolean> isPathValidationNeeded)
{
this.session = Optional.of(requireNonNull(session, "session is null"));
this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null");
Expand All @@ -124,6 +159,7 @@ private HdfsContext(
this.clientTags = Optional.of(session.getClientTags());
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.isNewTable = requireNonNull(isNewTable, "isNewTable is null");
this.isPathValidationNeeded = requireNonNull(isPathValidationNeeded, "isPathValidationNeeded is null");
}

public ConnectorIdentity getIdentity()
Expand Down Expand Up @@ -176,6 +212,11 @@ public Optional<ConnectorSession> getSession()
return session;
}

public Optional<Boolean> getIsPathValidationNeeded()
{
return isPathValidationNeeded;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,12 +823,39 @@ public synchronized void addPartition(
Partition partition,
Path currentLocation,
PartitionStatistics statistics)
{
addPartition(session, databaseName, tableName, tablePath, isNewTable, partition, currentLocation, statistics, false);
}

/**
* Add a new partition metadata in metastore
*
* @param session Connector level session
* @param databaseName Name of the schema
* @param tableName Name of the table
* @param tablePath Storage location of the table
* @param isNewTable The new partition is from an existing table or a new table
* @param partition The new partition object to be added
* @param currentLocation The path for which the partition is added in the table
* @param statistics The basic statistics and column statistics for the added partition
* @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only
*/
public synchronized void addPartition(
ConnectorSession session,
String databaseName,
String tableName,
String tablePath,
boolean isNewTable,
Partition partition,
Path currentLocation,
PartitionStatistics statistics,
boolean isPathValidationNeeded)
{
setShared();
checkArgument(getPrestoQueryId(partition).isPresent());
Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsOfTable.get(partition.getValues());
HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable);
HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable, isPathValidationNeeded);
if (oldPartitionAction == null) {
partitionActionsOfTable.put(
partition.getValues(),
Expand Down Expand Up @@ -1488,19 +1515,23 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext
partition.getSchemaTableName(),
ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE));

if (pathExists(context, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameDirectory(
context,
hdfsEnvironment,
currentPath,
targetPath,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
// we can bypass the file storage path checking logic for sync partition code path
// because the file paths have been verified during early phase of the sync logic already
if (!context.getIsPathValidationNeeded().orElse(false)) {
if (pathExists(context, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameDirectory(
context,
hdfsEnvironment,
currentPath,
targetPath,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
createDirectory(context, hdfsEnvironment, targetPath);
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
createDirectory(context, hdfsEnvironment, targetPath);
}
String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues());
partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ private static void addPartitions(
false,
buildPartitionObject(session, table, name),
new Path(table.getStorage().getLocation(), name),
PartitionStatistics.empty());
PartitionStatistics.empty(),
true);
}
}

Expand Down

0 comments on commit bb88317

Please sign in to comment.