Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,32 @@
<lombok.version>1.18.36</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.1</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<hudi.version>1.1.0-SNAPSHOT</hudi.version>
<aws.version>2.29.40</aws.version>
<hive.version>2.3.9</hive.version>
<databricks.version>0.41.0</databricks.version>
<aws.httpclient.version>4.5.13</aws.httpclient.version>
<hive.version>3.1.3</hive.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
<maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
<mockito.version>5.15.2</mockito.version>
<parquet.version>1.15.1</parquet.version>
<parquet.version>1.13.1</parquet.version>
<protobuf.version>3.25.5</protobuf.version>
<scala12.version>2.12.20</scala12.version>
<scala13.version>2.13.15</scala13.version>
<scala.version>${scala12.version}</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.2</spark.version>
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
<spark.version>3.5.2</spark.version>
<spark.version.prefix>3.5</spark.version.prefix>
<iceberg.version>1.5.2</iceberg.version>
<delta.version>3.0.0</delta.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull the spark, iceberg, and delta version upgrades into their own PR? It would help shrink the size of this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we can split it into version upgrades and iceberg pluggable tf PR's

<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
<google.java.format.version>1.8</google.java.format.version>
<delta.standalone.version>3.3.0</delta.standalone.version>
<delta.standalone.version>0.5.0</delta.standalone.version>
<delta.hive.version>3.0.0</delta.hive.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.dir.pattern>**/target/**</target.dir.pattern>
Expand Down Expand Up @@ -163,21 +165,6 @@
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down Expand Up @@ -267,6 +254,12 @@
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_2.12</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -318,7 +311,7 @@
<!-- Delta -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>${delta.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -700,6 +693,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<release>${maven.compiler.target}</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -716,6 +712,7 @@
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<trimStackTrace>false</trimStackTrace>
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds>
<argLine>@{argLine} -Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ public class InternalTable {
Instant latestCommitTime;
// Path to latest metadata
String latestMetdataPath;
// latest operation on the table.
String latestTableOperationId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class TableSyncMetadata {
int version;
String sourceTableFormat;
String sourceIdentifier;
String latestTableOperationId;

/**
* @deprecated Use {@link #of(Instant, List, String, String)} instead. This method exists for
Expand All @@ -64,20 +65,22 @@ public class TableSyncMetadata {
@Deprecated
public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return TableSyncMetadata.of(lastInstantSynced, instantsToConsiderForNextSync, null, null);
return TableSyncMetadata.of(lastInstantSynced, instantsToConsiderForNextSync, null, null, null);
}

public static TableSyncMetadata of(
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
String sourceTableFormat,
String sourceIdentifier) {
String sourceIdentifier,
String latestTableOperationId) {
return new TableSyncMetadata(
lastInstantSynced,
instantsToConsiderForNextSync,
CURRENT_VERSION,
sourceTableFormat,
sourceIdentifier);
sourceIdentifier,
latestTableOperationId);
}

public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ private SyncResult getSyncResult(
tableState.getLatestCommitTime(),
pendingCommits,
tableState.getTableFormat(),
sourceIdentifier);
sourceIdentifier,
tableState.getLatestTableOperationId());
conversionTarget.syncMetadata(latestState);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
Expand All @@ -178,6 +179,11 @@ private SyncResult getSyncResult(
fileSyncMethod.sync(conversionTarget);
conversionTarget.completeSync();

log.info(
"Took {} sec in mode {} to sync table change for {}",
Duration.between(startTime, Instant.now()).getSeconds(),
mode,
conversionTarget.getTableFormat());
return SyncResult.builder()
.mode(mode)
.tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ private static Stream<Arguments> provideMetadataAndJson() {
Instant.parse("2020-08-21T11:15:30.00Z"),
Instant.parse("2024-01-21T12:15:30.00Z")),
"TEST",
"0"),
"0",
null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList(), "TEST", "0"),
Instant.parse("2020-07-04T10:15:30.00Z"),
Collections.emptyList(),
"TEST",
"0",
null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0"),
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0", null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ void syncChangesWithFailureForOneFormat() {
conversionTargetWithMetadata.put(
mockConversionTarget1,
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "0"));
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "0", null));
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1"));
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1", null));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand Down Expand Up @@ -297,15 +297,17 @@ void syncChangesWithDifferentFormatsAndMetadata() {
tableChange2.getTableAsOfChange().getLatestCommitTime(),
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()),
"TEST",
tableChange2.getSourceIdentifier()));
tableChange2.getSourceIdentifier(),
null));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(
tableChange1.getTableAsOfChange().getLatestCommitTime(),
Collections.emptyList(),
"TEST",
tableChange1.getSourceIdentifier()));
tableChange1.getSourceIdentifier(),
null));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand Down Expand Up @@ -399,12 +401,12 @@ void syncChangesOneFormatWithNoRequiredChanges() {
// mockConversionTarget1 will have nothing to sync
conversionTargetWithMetadata.put(
mockConversionTarget1,
TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST", "0"));
TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST", "0", null));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1"));
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1", null));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand Down Expand Up @@ -479,6 +481,7 @@ private void verifyBaseConversionTargetCalls(
startingTableState.getLatestCommitTime(),
pendingCommitInstants,
startingTableState.getTableFormat(),
sourceIdentifier));
sourceIdentifier,
null));
}
}
2 changes: 1 addition & 1 deletion xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<!-- Delta dependencies -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.delta</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.AddFile;

import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

Expand Down Expand Up @@ -122,7 +123,9 @@ private Stream<AddFile> createAddFileAction(
true,
getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()),
null,
null));
null,
Option.empty(),
Option.empty()));
}

private String getColumnStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,27 @@

import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.Value;

import org.apache.hadoop.fs.Path;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalType;
Expand All @@ -74,6 +79,7 @@ public class BaseFileUpdatesExtractor {
* @param commit The current commit started by the Hudi client
* @return The information needed to create a "replace" commit for the Hudi table
*/
@SneakyThrows
ReplaceMetadata extractSnapshotChanges(
List<PartitionFileGroup> partitionedDataFiles,
HoodieTableMetaClient metaClient,
Expand All @@ -82,16 +88,53 @@ ReplaceMetadata extractSnapshotChanges(
HoodieMetadataConfig.newBuilder()
.enable(metaClient.getTableConfig().isMetadataTableAvailable())
.build();
HoodieTableFileSystemView fsView =
new HoodieMetadataFileSystemView(
engineContext, metaClient, metaClient.getActiveTimeline(), metadataConfig);
HoodieTableMetadata tableMetadata =
metadataConfig.isEnabled()
? metaClient
.getTableFormat()
.getMetadataFactory()
.create(
engineContext,
metaClient.getStorage(),
metadataConfig,
tableBasePath.toString(),
true)
: null;
FileSystemViewManager fileSystemViewManager =
FileSystemViewManager.createViewManager(
engineContext,
metadataConfig,
FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.build(),
HoodieCommonConfig.newBuilder().build(),
meta -> tableMetadata);
try (SyncableFileSystemView fsView = fileSystemViewManager.getFileSystemView(metaClient)) {
return extractFromFsView(partitionedDataFiles, commit, fsView, metaClient, metadataConfig);
} finally {
fileSystemViewManager.close();
if (tableMetadata != null) {
tableMetadata.close();
}
}
}

ReplaceMetadata extractFromFsView(
List<PartitionFileGroup> partitionedDataFiles,
String commit,
SyncableFileSystemView fsView,
HoodieTableMetaClient metaClient,
HoodieMetadataConfig metadataConfig) {
boolean isTableInitialized = metaClient.isTimelineNonEmpty();
// Track the partitions that are not present in the snapshot, so the files for those partitions
// can be dropped
Set<String> partitionPathsToDrop =
new HashSet<>(
FSUtils.getAllPartitionPaths(
engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
engineContext,
metaClient.getStorage(),
metadataConfig,
metaClient.getBasePath().toString()));
ReplaceMetadata replaceMetadata =
partitionedDataFiles.stream()
.map(
Expand Down
Loading