diff --git a/pom.xml b/pom.xml
index a30a4c985..1a3ed5642 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
2.43.0
0.16.1
1.8
- 0.5.0
+ 3.3.0
3.0.0
UTF-8
**/target/**
diff --git a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
index 9f40d29e8..9b4c3eacb 100644
--- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
+++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
@@ -160,11 +160,15 @@ private InternalSchema toInternalSchema(
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
- newDataType = InternalType.TIMESTAMP_NTZ;
+ // TODO: https://github.com/apache/incubator-xtable/issues/672
+ // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
+ newDataType = InternalType.LONG;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
- newDataType = InternalType.TIMESTAMP_NTZ;
+ // TODO: https://github.com/apache/incubator-xtable/issues/672
+ // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
+ newDataType = InternalType.LONG;
metadata.put(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
} else {
@@ -350,6 +354,22 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
case INT:
return finalizeSchema(Schema.create(Schema.Type.INT), internalSchema);
case LONG:
+ if (internalSchema.getMetadata() != null
+ && internalSchema
+ .getMetadata()
+ .containsKey(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)) {
+ if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.MILLIS) {
+ return finalizeSchema(
+ LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)),
+ internalSchema);
+ }
+ {
+ return finalizeSchema(
+ LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
+ internalSchema);
+ }
+ }
return finalizeSchema(Schema.create(Schema.Type.LONG), internalSchema);
case STRING:
return finalizeSchema(Schema.create(Schema.Type.STRING), internalSchema);
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index b487e2cbc..30d629215 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -73,9 +73,9 @@
@Log4j2
public class DeltaConversionTarget implements ConversionTarget {
- private static final String MIN_READER_VERSION = String.valueOf(1);
+ private static final int MIN_READER_VERSION = 1;
// gets access to generated columns.
- private static final String MIN_WRITER_VERSION = String.valueOf(4);
+ private static final int MIN_WRITER_VERSION = 4;
private DeltaLog deltaLog;
private DeltaSchemaExtractor schemaExtractor;
@@ -329,8 +329,14 @@ private void commitTransaction() {
private Map getConfigurationsForDeltaSync() {
Map configMap = new HashMap<>();
- configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION);
- configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION);
+ configMap.put(
+ DeltaConfigs.MIN_READER_VERSION().key(),
+ String.valueOf(
+ Math.max(deltaLog.snapshot().protocol().minReaderVersion(), MIN_READER_VERSION)));
+ configMap.put(
+ DeltaConfigs.MIN_WRITER_VERSION().key(),
+ String.valueOf(
+ Math.max(deltaLog.snapshot().protocol().minWriterVersion(), MIN_WRITER_VERSION)));
configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
// Sets retention for the Delta Log, does not impact underlying files in the table
configMap.put(
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index d1303e842..1376f884e 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -56,6 +56,11 @@
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
+ // Timestamps in Delta are microsecond precision by default
+ private static final Map
+ DEFAULT_TIMESTAMP_PRECISION_METADATA =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
public static DeltaSchemaExtractor getInstance() {
return INSTANCE;
@@ -110,11 +115,11 @@ private InternalSchema toInternalSchema(
break;
case "timestamp":
type = InternalType.TIMESTAMP;
- // Timestamps in Delta are microsecond precision by default
- metadata =
- Collections.singletonMap(
- InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
- InternalSchema.MetadataValue.MICROS);
+ metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
+ break;
+ case "timestamp_ntz":
+ type = InternalType.TIMESTAMP_NTZ;
+ metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
break;
case "struct":
StructType structType = (StructType) dataType;
diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 4cf825d77..4366bc025 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -173,7 +173,6 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
case INT:
return Types.IntegerType.get();
case LONG:
- case TIMESTAMP_NTZ: // TODO - revisit this
return Types.LongType.get();
case BYTES:
return Types.BinaryType.get();
@@ -189,6 +188,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
return Types.DateType.get();
case TIMESTAMP:
return Types.TimestampType.withZone();
+ case TIMESTAMP_NTZ:
+ return Types.TimestampType.withoutZone();
case DOUBLE:
return Types.DoubleType.get();
case DECIMAL:
diff --git a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
index cda649415..8db9f6109 100644
--- a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
@@ -62,7 +62,6 @@ private DataType convertFieldType(InternalField field) {
case INT:
return DataTypes.IntegerType;
case LONG:
- case TIMESTAMP_NTZ:
return DataTypes.LongType;
case BYTES:
case FIXED:
@@ -76,6 +75,8 @@ private DataType convertFieldType(InternalField field) {
return DataTypes.DateType;
case TIMESTAMP:
return DataTypes.TimestampType;
+ case TIMESTAMP_NTZ:
+ return DataTypes.TimestampNTZType;
case DOUBLE:
return DataTypes.DoubleType;
case DECIMAL:
diff --git a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
index 0b6823a1f..4e2991989 100644
--- a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
+++ b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
@@ -576,7 +576,7 @@ public void testAvroLogicalTypes() {
.schema(
InternalSchema.builder()
.name("long")
- .dataType(InternalType.TIMESTAMP_NTZ)
+ .dataType(InternalType.LONG)
.isNullable(false)
.metadata(millisMetadata)
.build())
@@ -586,7 +586,7 @@ public void testAvroLogicalTypes() {
.schema(
InternalSchema.builder()
.name("long")
- .dataType(InternalType.TIMESTAMP_NTZ)
+ .dataType(InternalType.LONG)
.isNullable(false)
.metadata(microsMetadata)
.build())
diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 9c235a198..81ab34d24 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -322,13 +322,36 @@ public void testTimestamps() {
.metadata(metadata)
.build())
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("requiredTimestampNtz")
+ .schema(
+ InternalSchema.builder()
+ .name("timestamp_ntz")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(false)
+ .metadata(metadata)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("optionalTimestampNtz")
+ .schema(
+ InternalSchema.builder()
+ .name("timestamp_ntz")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(true)
+ .metadata(metadata)
+ .build())
+ .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
.build()))
.build();
StructType structRepresentationTimestamp =
new StructType()
.add("requiredTimestamp", DataTypes.TimestampType, false)
- .add("optionalTimestamp", DataTypes.TimestampType, true);
+ .add("optionalTimestamp", DataTypes.TimestampType, true)
+ .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
+ .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
Assertions.assertEquals(
internalSchemaTimestamp,
diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 7e921efe5..fc8a42c2f 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -53,6 +53,7 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -94,6 +95,7 @@
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.schema.SchemaFieldFinder;
@@ -431,6 +433,39 @@ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Excep
assertFalse(unmappedTargetId.isPresent());
}
+ @Test
+ public void testTimestampNtz() {
+ InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
+ List fields2 = new ArrayList<>(schema1.getFields());
+ fields2.add(
+ InternalField.builder()
+ .name("float_field")
+ .schema(
+ InternalSchema.builder()
+ .name("float")
+ .dataType(InternalType.FLOAT)
+ .isNullable(true)
+ .build())
+ .build());
+ InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build();
+ InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME);
+ InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME);
+
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath);
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath);
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath);
+
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+ validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)));
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+ validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)));
+ }
+
private static Stream timestampPartitionTestingArgs() {
return Stream.of(
Arguments.of(PartitionTransformType.YEAR),
@@ -472,6 +507,13 @@ private void validateDeltaTable(
internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation");
}
+ private void validateDeltaTableUsingSpark(
+ Path basePath, Set internalDataFiles) {
+ Dataset dataset = sparkSession.read().format("delta").load(basePath.toString());
+ long countFromFiles = internalDataFiles.stream().mapToLong(InternalFile::getRecordCount).sum();
+ Assertions.assertEquals(countFromFiles, dataset.count());
+ }
+
private InternalSnapshot buildSnapshot(
InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
return InternalSnapshot.builder()
@@ -563,6 +605,25 @@ private InternalSchema getInternalSchema() {
.build();
}
+ private InternalSchema getInternalSchemaWithTimestampNtz() {
+ Map timestampMetadata = new HashMap<>();
+ timestampMetadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
+ List fields = new ArrayList<>(getInternalSchema().getFields());
+ fields.add(
+ InternalField.builder()
+ .name("timestamp_ntz_field")
+ .schema(
+ InternalSchema.builder()
+ .name("time_ntz")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(true)
+ .metadata(timestampMetadata)
+ .build())
+ .build());
+ return getInternalSchema().toBuilder().fields(fields).build();
+ }
+
private static SparkSession buildSparkSession() {
SparkConf sparkConf =
new SparkConf()
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 287765413..824e22856 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -501,14 +501,18 @@ public void testTimestamps() {
1, "requiredTimestampMillis", Types.TimestampType.withZone()),
Types.NestedField.optional(
2, "optionalTimestampMillis", Types.TimestampType.withZone()),
- Types.NestedField.required(3, "requiredTimestampNtzMillis", Types.LongType.get()),
- Types.NestedField.optional(4, "optionalTimestampNtzMillis", Types.LongType.get()),
+ Types.NestedField.required(
+ 3, "requiredTimestampNtzMillis", Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(
+ 4, "optionalTimestampNtzMillis", Types.TimestampType.withoutZone()),
Types.NestedField.required(
5, "requiredTimestampMicros", Types.TimestampType.withZone()),
Types.NestedField.optional(
6, "optionalTimestampMicros", Types.TimestampType.withZone()),
- Types.NestedField.required(7, "requiredTimestampNtzMicros", Types.LongType.get()),
- Types.NestedField.optional(8, "optionalTimestampNtzMicros", Types.LongType.get()));
+ Types.NestedField.required(
+ 7, "requiredTimestampNtzMicros", Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(
+ 8, "optionalTimestampNtzMicros", Types.TimestampType.withoutZone()));
assertTrue(expectedTargetSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(irSchema)));
Schema sourceSchema =
diff --git a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
index 59385f055..1a5d71b15 100644
--- a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
@@ -385,8 +385,8 @@ public void testTimestamps() {
StructType structRepresentationTimestampNtz =
new StructType()
- .add("requiredTimestampNtz", DataTypes.LongType, false)
- .add("optionalTimestampNtz", DataTypes.LongType, true);
+ .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
+ .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
Assertions.assertEquals(
structRepresentationTimestamp,