From ff91b1dfba3d9c3ac57a2299cfef33365e5c03f1 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Tue, 1 Apr 2025 14:48:46 -0700 Subject: [PATCH] Handle timestamp_ntz in delta and iceberg --- pom.xml | 2 +- .../xtable/avro/AvroSchemaConverter.java | 24 +++++++- .../xtable/delta/DeltaConversionTarget.java | 14 +++-- .../xtable/delta/DeltaSchemaExtractor.java | 15 +++-- .../iceberg/IcebergSchemaExtractor.java | 3 +- .../xtable/schema/SparkSchemaExtractor.java | 3 +- .../xtable/avro/TestAvroSchemaConverter.java | 4 +- .../delta/TestDeltaSchemaExtractor.java | 25 +++++++- .../apache/xtable/delta/TestDeltaSync.java | 61 +++++++++++++++++++ .../iceberg/TestIcebergSchemaExtractor.java | 12 ++-- .../schema/TestSparkSchemaExtractor.java | 4 +- 11 files changed, 144 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index a30a4c985..1a3ed5642 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ 2.43.0 0.16.1 1.8 - 0.5.0 + 3.3.0 3.0.0 UTF-8 **/target/** diff --git a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java index 9f40d29e8..9b4c3eacb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java @@ -160,11 +160,15 @@ private InternalSchema toInternalSchema( metadata.put( InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { - newDataType = InternalType.TIMESTAMP_NTZ; + // TODO: https://github.com/apache/incubator-xtable/issues/672 + // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x + newDataType = InternalType.LONG; metadata.put( InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { - newDataType = InternalType.TIMESTAMP_NTZ; + // TODO: https://github.com/apache/incubator-xtable/issues/672 + // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x + newDataType = InternalType.LONG; metadata.put( InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); } else { @@ -350,6 +354,22 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP case INT: return finalizeSchema(Schema.create(Schema.Type.INT), internalSchema); case LONG: + if (internalSchema.getMetadata() != null + && internalSchema + .getMetadata() + .containsKey(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)) { + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.MILLIS) { + return finalizeSchema( + LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), + internalSchema); + } + { + return finalizeSchema( + LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), + internalSchema); + } + } return finalizeSchema(Schema.create(Schema.Type.LONG), internalSchema); case STRING: return finalizeSchema(Schema.create(Schema.Type.STRING), internalSchema); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index b487e2cbc..30d629215 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -73,9 +73,9 @@ @Log4j2 public class DeltaConversionTarget implements ConversionTarget { - private static final String MIN_READER_VERSION = String.valueOf(1); + private static final int MIN_READER_VERSION = 1; // gets access to generated columns. - private static final String MIN_WRITER_VERSION = String.valueOf(4); + private static final int MIN_WRITER_VERSION = 4; private DeltaLog deltaLog; private DeltaSchemaExtractor schemaExtractor; @@ -329,8 +329,14 @@ private void commitTransaction() { private Map getConfigurationsForDeltaSync() { Map configMap = new HashMap<>(); - configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION); - configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION); + configMap.put( + DeltaConfigs.MIN_READER_VERSION().key(), + String.valueOf( + Math.max(deltaLog.snapshot().protocol().minReaderVersion(), MIN_READER_VERSION))); + configMap.put( + DeltaConfigs.MIN_WRITER_VERSION().key(), + String.valueOf( + Math.max(deltaLog.snapshot().protocol().minWriterVersion(), MIN_WRITER_VERSION))); configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); // Sets retention for the Delta Log, does not impact underlying files in the table configMap.put( diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index d1303e842..1376f884e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -56,6 +56,11 @@ public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); + // Timestamps in Delta are microsecond precision by default + private static final Map + DEFAULT_TIMESTAMP_PRECISION_METADATA = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); public static DeltaSchemaExtractor getInstance() { return INSTANCE; @@ -110,11 +115,11 @@ private InternalSchema toInternalSchema( break; case "timestamp": type = InternalType.TIMESTAMP; - // Timestamps in Delta are microsecond precision by default - metadata = - Collections.singletonMap( - InternalSchema.MetadataKey.TIMESTAMP_PRECISION, - InternalSchema.MetadataValue.MICROS); + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; + break; + case "timestamp_ntz": + type = InternalType.TIMESTAMP_NTZ; + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; break; case "struct": StructType structType = (StructType) dataType; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 4cf825d77..4366bc025 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -173,7 +173,6 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { case INT: return Types.IntegerType.get(); case LONG: - case TIMESTAMP_NTZ: // TODO - revisit this return Types.LongType.get(); case BYTES: return Types.BinaryType.get(); @@ -189,6 +188,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { return Types.DateType.get(); case TIMESTAMP: return Types.TimestampType.withZone(); + case TIMESTAMP_NTZ: + return Types.TimestampType.withoutZone(); case DOUBLE: return Types.DoubleType.get(); case DECIMAL: diff --git a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java index cda649415..8db9f6109 100644 --- a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java @@ -62,7 +62,6 @@ private DataType convertFieldType(InternalField field) { case INT: return DataTypes.IntegerType; case LONG: - case TIMESTAMP_NTZ: return DataTypes.LongType; case BYTES: case FIXED: @@ -76,6 +75,8 @@ private DataType convertFieldType(InternalField field) { return DataTypes.DateType; case TIMESTAMP: return DataTypes.TimestampType; + case TIMESTAMP_NTZ: + return DataTypes.TimestampNTZType; case DOUBLE: return DataTypes.DoubleType; case DECIMAL: diff --git a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java index 0b6823a1f..4e2991989 100644 --- a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java @@ -576,7 +576,7 @@ public void testAvroLogicalTypes() { .schema( InternalSchema.builder() .name("long") - .dataType(InternalType.TIMESTAMP_NTZ) + .dataType(InternalType.LONG) .isNullable(false) .metadata(millisMetadata) .build()) @@ -586,7 +586,7 @@ public void testAvroLogicalTypes() { .schema( InternalSchema.builder() .name("long") - .dataType(InternalType.TIMESTAMP_NTZ) + .dataType(InternalType.LONG) .isNullable(false) .metadata(microsMetadata) .build()) diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 9c235a198..81ab34d24 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -322,13 +322,36 @@ public void testTimestamps() { .metadata(metadata) .build()) .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("requiredTimestampNtz") + .schema( + InternalSchema.builder() + .name("timestamp_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(false) + .metadata(metadata) + .build()) + .build(), + InternalField.builder() + .name("optionalTimestampNtz") + .schema( + InternalSchema.builder() + .name("timestamp_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(true) + .metadata(metadata) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) .build())) .build(); StructType structRepresentationTimestamp = new StructType() .add("requiredTimestamp", DataTypes.TimestampType, false) - .add("optionalTimestamp", DataTypes.TimestampType, true); + .add("optionalTimestamp", DataTypes.TimestampType, true) + .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false) + .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true); Assertions.assertEquals( internalSchemaTimestamp, diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 7e921efe5..fc8a42c2f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -53,6 +53,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -94,6 +95,7 @@ import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.schema.SchemaFieldFinder; @@ -431,6 +433,39 @@ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Excep assertFalse(unmappedTargetId.isPresent()); } + @Test + public void testTimestampNtz() { + InternalSchema schema1 = getInternalSchemaWithTimestampNtz(); + List fields2 = new ArrayList<>(schema1.getFields()); + fields2.add( + InternalField.builder() + .name("float_field") + .schema( + InternalSchema.builder() + .name("float") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .build()); + InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); + InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); + } + private static Stream timestampPartitionTestingArgs() { return Stream.of( Arguments.of(PartitionTransformType.YEAR), @@ -472,6 +507,13 @@ private void validateDeltaTable( internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation"); } + private void validateDeltaTableUsingSpark( + Path basePath, Set internalDataFiles) { + Dataset dataset = sparkSession.read().format("delta").load(basePath.toString()); + long countFromFiles = internalDataFiles.stream().mapToLong(InternalFile::getRecordCount).sum(); + Assertions.assertEquals(countFromFiles, dataset.count()); + } + private InternalSnapshot buildSnapshot( InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { return InternalSnapshot.builder() @@ -563,6 +605,25 @@ private InternalSchema getInternalSchema() { .build(); } + private InternalSchema getInternalSchemaWithTimestampNtz() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + List fields = new ArrayList<>(getInternalSchema().getFields()); + fields.add( + InternalField.builder() + .name("timestamp_ntz_field") + .schema( + InternalSchema.builder() + .name("time_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build()); + return getInternalSchema().toBuilder().fields(fields).build(); + } + private static SparkSession buildSparkSession() { SparkConf sparkConf = new SparkConf() diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 287765413..824e22856 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -501,14 +501,18 @@ public void testTimestamps() { 1, "requiredTimestampMillis", Types.TimestampType.withZone()), Types.NestedField.optional( 2, "optionalTimestampMillis", Types.TimestampType.withZone()), - Types.NestedField.required(3, "requiredTimestampNtzMillis", Types.LongType.get()), - Types.NestedField.optional(4, "optionalTimestampNtzMillis", Types.LongType.get()), + Types.NestedField.required( + 3, "requiredTimestampNtzMillis", Types.TimestampType.withoutZone()), + Types.NestedField.optional( + 4, "optionalTimestampNtzMillis", Types.TimestampType.withoutZone()), Types.NestedField.required( 5, "requiredTimestampMicros", Types.TimestampType.withZone()), Types.NestedField.optional( 6, "optionalTimestampMicros", Types.TimestampType.withZone()), - Types.NestedField.required(7, "requiredTimestampNtzMicros", Types.LongType.get()), - Types.NestedField.optional(8, "optionalTimestampNtzMicros", Types.LongType.get())); + Types.NestedField.required( + 7, "requiredTimestampNtzMicros", Types.TimestampType.withoutZone()), + Types.NestedField.optional( + 8, "optionalTimestampNtzMicros", Types.TimestampType.withoutZone())); assertTrue(expectedTargetSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(irSchema))); Schema sourceSchema = diff --git a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java index 59385f055..1a5d71b15 100644 --- a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java @@ -385,8 +385,8 @@ public void testTimestamps() { StructType structRepresentationTimestampNtz = new StructType() - .add("requiredTimestampNtz", DataTypes.LongType, false) - .add("optionalTimestampNtz", DataTypes.LongType, true); + .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false) + .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true); Assertions.assertEquals( structRepresentationTimestamp,