From 0267c3329adc6163efdc14daaabdfa57580ceaf1 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 11:11:53 +0000 Subject: [PATCH 01/14] Add round trip int 96 stats test --- ...TestInt96TimestampStatisticsRoundTrip.java | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java new file mode 100644 index 0000000000..19e8fb8bd6 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestInt96TimestampStatisticsRoundTrip { + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private MessageType createSchema() { + return Types.buildMessage() + .required(INT96).named("timestamp_field") + .named("root"); + } + + /** + * Convert a timestamp string in format "yyyy-MM-ddTHH:mm:ss.SSS" to INT96 bytes using NanoTime. + * INT96 timestamps in Parquet are encoded as 12 bytes where: + * - First 8 bytes: nanoseconds from midnight + * - Last 4 bytes: Julian day + */ + private Binary timestampToInt96(String timestamp) { + LocalDateTime dt = LocalDateTime.parse(timestamp); + long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day + long nanos = dt.toLocalTime().toNanoOfDay(); + return new NanoTime((int)julianDay, nanos).toBinary(); + } + + private void writeParquetFile(Path file, List timestampValues) throws IOException { + MessageType schema = createSchema(); + Configuration conf = new Configuration(); + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .withConf(conf) + .withType(schema) + .build()) { + for (Binary value : timestampValues) { + Group group = new SimpleGroup(schema); + group.add("timestamp_field", value); + writer.write(group); + } + } + } + + private void verifyStatistics(Path file, Binary minValue, Binary maxValue, boolean readInt96Stats) throws IOException { + Configuration conf = new Configuration(); + conf.set("parquet.read.int96stats.enabled", readInt96Stats ? "true" : "false"); + ParquetMetadata metadata = ParquetFileReader.readFooter(conf, file); + + // Verify INT96 statistics + ColumnChunkMetaData timestampColumn = metadata.getBlocks().get(0).getColumns().get(0); + Statistics timestampStats = timestampColumn.getStatistics(); + + if (readInt96Stats) { + assertTrue("INT96 statistics have non-null values", timestampStats.hasNonNullValue()); + assertEquals(Binary.fromConstantByteArray(timestampStats.getMinBytes()), minValue); + assertEquals(Binary.fromConstantByteArray(timestampStats.getMaxBytes()), maxValue); + } else { + assertTrue("INT96 statistics should not be present", !timestampStats.hasNonNullValue()); + return; + } + } + + private void runTimestampTest(String[] timestamps) throws IOException { + Binary minValue = timestampToInt96(timestamps[0]); + Binary maxValue = timestampToInt96(timestamps[timestamps.length - 1]); + List timestampValues = new ArrayList<>(); + for (String timestamp : timestamps) { + timestampValues.add(timestampToInt96(timestamp)); + } + Collections.shuffle(timestampValues); + + Path file = new Path(temp.getRoot().getPath(), "test_timestamps.parquet"); + writeParquetFile(file, timestampValues); + verifyStatistics(file, minValue, maxValue, false); + verifyStatistics(file, minValue, maxValue, true); + } + + @Test + public void testMultipleDates() throws IOException { + String[] timestamps = { + "2020-01-01T00:00:00.000", + "2020-02-29T23:59:59.000", + "2020-12-31T23:59:59.000", + "2021-01-01T00:00:00.000", + "2023-06-15T12:30:45.000", + "2024-02-29T15:45:30.000", + "2024-12-25T07:00:00.000", + "2025-01-01T00:00:00.000", + "2025-07-04T20:00:00.000", + "2025-12-31T23:59:59.000" + }; + runTimestampTest(timestamps); + } + + @Test + public void testSameDayDifferentTime() throws IOException { + String[] timestamps = { + "2020-01-01T00:01:00.000", + "2020-01-01T00:02:00.000", + "2020-01-01T00:03:00.000" + }; + runTimestampTest(timestamps); + } + + @Test + public void testIncreasingDayDecreasingTime() throws IOException { + String[] timestamps = { + "2020-01-01T12:00:00.000", + "2020-02-01T11:00:00.000", + "2020-03-01T10:00:00.000" + }; + runTimestampTest(timestamps); + } +} From 9a6a2051774d102057a7aa100beaf2ab3a38a13a Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 14:12:44 +0000 Subject: [PATCH 02/14] ValidInt96Stats --- .../org/apache/parquet/ValidInt96Stats.java | 77 +++++++++++++++++++ .../apache/parquet/ValidInt96StatsTest.java | 57 ++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java new file mode 100644 index 0000000000..9ce963493d --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.parquet.SemanticVersion.SemanticVersionParseException; +import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.VersionParser.VersionParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Not all parquet writers populate the int96 statistics correctly. For example: arrow-rs + * https://github.com/apache/arrow-rs/blob/3ed9aedabc9e5a90170e43ff818f24a29eafb35b/parquet/src/file/statistics.rs#L212-L215 + * This class is used to detect whether a file was written with a version that has correct int96 statistics. + */ +public class ValidInt96Stats { + private static final AtomicBoolean alreadyLogged = new AtomicBoolean(false); + + private static final Logger LOG = LoggerFactory.getLogger(ValidInt96Stats.class); + + + /** + * Decides if the statistics from a file created by createdBy (the created_by field from parquet format) + * should be trusted for INT96 columns. + * + * @param createdBy the created-by string from a file footer + * @return true if the statistics are valid and can be trusted, false otherwise + */ + public static boolean hasValidInt96Stats(String createdBy) { + if (Strings.isNullOrEmpty(createdBy)) { + warnOnce("Cannot verify INT96 statistics because created_by is null or empty"); + return false; + } + + try { + ParsedVersion version = VersionParser.parse(createdBy); + if ("parquet-mr".equals(version.application)) { + return true; + } + if ("parquet-mr compatible Photon".equals(version.application)) { + return true; + } + } catch (RuntimeException | VersionParseException e) { + warnParseErrorOnce(createdBy, e); + } + return false; + } + + private static void warnParseErrorOnce(String createdBy, Throwable e) { + if (!alreadyLogged.getAndSet(true)) { + LOG.warn("Cannot verify INT96 statistics because created_by could not be parsed: " + createdBy, e); + } + } + + private static void warnOnce(String message) { + if (!alreadyLogged.getAndSet(true)) { + LOG.warn(message); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java new file mode 100644 index 0000000000..edb3168af0 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class ValidInt96StatsTest { + + @Test + public void testNullAndEmpty() { + assertFalse(ValidInt96Stats.hasValidInt96Stats(null)); + assertFalse(ValidInt96Stats.hasValidInt96Stats("")); + } + + @Test + public void testParquetMrValid() { + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3 (build abcd)")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3-SNAPSHOT")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1-SNAPSHOT")); + } + + @Test + public void testParquetMrCompatiblePhotonValid() { + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0 (build abcd)")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0-SNAPSHOT")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0rc1")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0rc1-SNAPSHOT")); + } + + @Test + public void testInvalidApplications() { + assertFalse(ValidInt96Stats.hasValidInt96Stats("arrow-rs version 0.1.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("impala version 1.6.0")); + } +} From 91e47db14568a021080ac0bf2c07f3684dcf0d1b Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 16:20:15 +0000 Subject: [PATCH 03/14] Add BINARY_AS_INT_96_COMPARATOR --- .../parquet/schema/PrimitiveComparator.java | 25 ++++++++++++++ .../schema/TestPrimitiveComparator.java | 33 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 50c4acd4c9..76312f9b6c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -206,6 +206,31 @@ public String toString() { } }; +/* + * This comparator is for comparing two timestamps represented as int96 binary. + */ + static final PrimitiveComparator BINARY_AS_INT_96_COMPARATOR = new BinaryComparator() { + @Override + int compareBinary(Binary b1, Binary b2) { + ByteBuffer bb1 = b1.toByteBuffer(); + ByteBuffer bb2 = b2.toByteBuffer(); + bb1.order(java.nio.ByteOrder.LITTLE_ENDIAN); + bb2.order(java.nio.ByteOrder.LITTLE_ENDIAN); + int jd1 = bb1.getInt(8); + int jd2 = bb2.getInt(8); + if (jd1 != jd2) return Integer.compareUnsigned(jd1, jd2) < 0 ? -1 : 1; + long s1 = bb1.getLong(0); + long s2 = bb2.getLong(0); + if (s1 != s2) return Long.compareUnsigned(s1, s2) < 0 ? 1 : 1; + return 0; + } + + @Override + public String toString() { + return "BINARY_AS_INT_96_COMPARATOR"; + } + }; + /* * This comparator is for comparing two signed decimal values represented in twos-complement binary. In case of the * binary length of one value is shorter than the other it will be padded by the corresponding prefix (0xFF for diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index d3d1b15bc6..e2a47a82ea 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -20,6 +20,7 @@ import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT_96_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.FLOAT_COMPARATOR; @@ -33,9 +34,11 @@ import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.example.data.simple.NanoTime; import org.junit.Test; /* @@ -274,6 +277,36 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() { } } + private Binary timestampToInt96(String timestamp) { + LocalDateTime dt = LocalDateTime.parse(timestamp); + long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day + long nanos = dt.toLocalTime().toNanoOfDay(); + return new NanoTime((int)julianDay, nanos).toBinary(); + } + + @Test + public void testInt96Comparator() { + Binary[] valuesInAscendingOrder = { + timestampToInt96("2020-01-01T00:00:00.000"), + timestampToInt96("2020-02-29T23:59:59.999"), + timestampToInt96("2020-12-31T23:59:59.999"), + timestampToInt96("2021-01-01T00:00:00.000"), + timestampToInt96("2023-06-15T12:30:45.500"), + timestampToInt96("2024-02-29T15:45:30.750"), + timestampToInt96("2024-12-25T07:00:00.000"), + timestampToInt96("2025-01-01T00:00:00.000"), + timestampToInt96("2025-07-04T20:00:00.000"), + timestampToInt96("2025-12-31T23:59:59.999") + }; + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + Binary bi = valuesInAscendingOrder[i]; + Binary bj = valuesInAscendingOrder[j]; + assertEquals(Integer.compare(i, j), BINARY_AS_INT_96_COMPARATOR.compare(bi, bj)); + } + } + } + @Test public void testFloat16Comparator() { Binary[] valuesInAscendingOrder = { From beb7ad3021b9902ad8158c9b8988d9038ab59aa4 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 16:41:31 +0000 Subject: [PATCH 04/14] Pass readInt96stats --- .../apache/parquet/schema/PrimitiveType.java | 13 +++--------- .../java/org/apache/parquet/schema/Types.java | 4 ++-- .../org/apache/parquet/HadoopReadOptions.java | 3 +++ .../apache/parquet/ParquetReadOptions.java | 20 +++++++++++++++++++ .../parquet/hadoop/ParquetInputFormat.java | 10 ++++++++++ 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 6beff4da93..81a8151ad1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -363,7 +363,7 @@ public T convert(PrimitiveTypeNameConverter conve @Override PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { - return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; + return PrimitiveComparator.BINARY_AS_INT_96_COMPARATOR; } }, FIXED_LEN_BYTE_ARRAY("getBinary", Binary.class) { @@ -542,7 +542,7 @@ public PrimitiveType( this.decimalMeta = decimalMeta; if (columnOrder == null) { - columnOrder = primitive == PrimitiveTypeName.INT96 || originalType == OriginalType.INTERVAL + columnOrder = originalType == OriginalType.INTERVAL ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); } @@ -587,8 +587,7 @@ public PrimitiveType( } if (columnOrder == null) { - columnOrder = primitive == PrimitiveTypeName.INT96 - || logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation + columnOrder = logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); } @@ -596,12 +595,6 @@ public PrimitiveType( } private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) { - if (primitive == PrimitiveTypeName.INT96) { - Preconditions.checkArgument( - columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED, - "The column order %s is not supported by INT96", - columnOrder); - } if (getLogicalTypeAnnotation() != null) { Preconditions.checkArgument( getLogicalTypeAnnotation().isValidColumnOrder(columnOrder), diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index fd82d36768..0214eab259 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -413,8 +413,8 @@ public THIS scale(int scale) { /** * Adds the column order for the primitive type. *

- * In case of not set the default column order is {@link ColumnOrderName#TYPE_DEFINED_ORDER} except the type - * {@link PrimitiveTypeName#INT96} and the types annotated by {@link OriginalType#INTERVAL} where the default column + * In case of not set the default column order is {@link ColumnOrderName#TYPE_DEFINED_ORDER} except the types + * annotated by {@link OriginalType#INTERVAL} where the default column * order is {@link ColumnOrderName#UNDEFINED}. * * @param columnOrder the column order for the primitive type diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index e277fcde5f..eff05f5a7c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -44,6 +44,7 @@ private HadoopReadOptions( boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -63,6 +64,7 @@ private HadoopReadOptions( useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, @@ -126,6 +128,7 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 895d0670fa..c4945eadb9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -22,12 +22,14 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DEFAULT_READ_INT96_STATS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; @@ -63,6 +65,7 @@ public class ParquetReadOptions { private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = false; private final boolean useSignedStringMinMax; + private final boolean readInt96Stats; private final boolean useStatsFilter; private final boolean useDictionaryFilter; private final boolean useRecordFilter; @@ -91,6 +94,7 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -109,6 +113,7 @@ public class ParquetReadOptions { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, @@ -130,6 +135,7 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -148,6 +154,7 @@ public class ParquetReadOptions { this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; this.useHadoopVectoredIo = useHadoopVectoredIo; + this.readInt96Stats = readInt96Stats; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -163,6 +170,10 @@ public boolean useSignedStringMinMax() { return useSignedStringMinMax; } + public boolean readInt96Stats() { + return readInt96Stats; + } + public boolean useStatsFilter() { return useStatsFilter; } @@ -250,6 +261,7 @@ public static Builder builder(ParquetConfiguration conf) { public static class Builder { protected boolean useSignedStringMinMax = false; + protected boolean readInt96Stats = DEFAULT_READ_INT96_STATS_ENABLED; protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; @@ -287,6 +299,7 @@ public Builder(ParquetConfiguration conf) { withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); withUseHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, HADOOP_VECTORED_IO_DEFAULT)); + readInt96Stats(conf.getBoolean(READ_INT96_STATS_ENABLED, DEFAULT_READ_INT96_STATS_ENABLED)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); @@ -338,6 +351,11 @@ public Builder withUseHadoopVectoredIo(boolean useHadoopVectoredIo) { return this; } + public Builder readInt96Stats(boolean readInt96Stats) { + this.readInt96Stats = readInt96Stats; + return this; + } + public Builder useColumnIndexFilter(boolean useColumnIndexFilter) { this.useColumnIndexFilter = useColumnIndexFilter; return this; @@ -437,6 +455,7 @@ public Builder copy(ParquetReadOptions options) { useRecordFilter(options.useRecordFilter); withRecordFilter(options.recordFilter); withUseHadoopVectoredIo(options.useHadoopVectoredIo); + readInt96Stats(options.readInt96Stats); withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); @@ -469,6 +488,7 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 8e05d49bd3..ebc2bfe3e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -122,6 +122,16 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; + /** + * key to configure whether int96 stats are read and exposed + */ + public static final String READ_INT96_STATS_ENABLED = "parquet.read.int96stats.enabled"; + + /** + * default value for READ_INT96_STATS_ENABLED + */ + public static final boolean DEFAULT_READ_INT96_STATS_ENABLED = true; + /** * key to configure whether row group dictionary filtering is enabled */ From e1e2e8d399682ff32792cd99a7b276eb3c29909f Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 17:38:49 +0000 Subject: [PATCH 05/14] Compile correctly --- .../converter/ParquetMetadataConverter.java | 60 +++++++++++-------- .../parquet/hadoop/ParquetFileReader.java | 4 +- .../TestParquetMetadataConverter.java | 45 ++++++++------ .../parquet/hadoop/TestParquetFileWriter.java | 7 ++- 4 files changed, 68 insertions(+), 48 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index d20ac7faeb..2c458e8aa2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -24,6 +24,8 @@ import static org.apache.parquet.format.Util.readFileMetaData; import static org.apache.parquet.format.Util.writeColumnMetaData; import static org.apache.parquet.format.Util.writePageHeader; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DEFAULT_READ_INT96_STATS_ENABLED; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -47,6 +49,7 @@ import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.ValidInt96Stats; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -154,13 +157,14 @@ public class ParquetMetadataConverter { new ConvertedTypeConverterVisitor(); private final int statisticsTruncateLength; private final boolean useSignedStringMinMax; + private final boolean readInt96Stats; public ParquetMetadataConverter() { - this(false); + this(false, DEFAULT_READ_INT96_STATS_ENABLED); } public ParquetMetadataConverter(int statisticsTruncateLength) { - this(false, statisticsTruncateLength); + this(false, statisticsTruncateLength, DEFAULT_READ_INT96_STATS_ENABLED); } /** @@ -169,23 +173,25 @@ public ParquetMetadataConverter(int statisticsTruncateLength) { */ @Deprecated public ParquetMetadataConverter(Configuration conf) { - this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false), conf.getBoolean(READ_INT96_STATS_ENABLED, + DEFAULT_READ_INT96_STATS_ENABLED)); } public ParquetMetadataConverter(ParquetReadOptions options) { - this(options.useSignedStringMinMax()); + this(options.useSignedStringMinMax(), options.readInt96Stats()); } - private ParquetMetadataConverter(boolean useSignedStringMinMax) { - this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + private ParquetMetadataConverter(boolean useSignedStringMinMax, boolean readInt96Stats) { + this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, readInt96Stats); } - private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) { + private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength, boolean readInt96Stats) { if (statisticsTruncateLength <= 0) { throw new IllegalArgumentException("Truncate length should be greater than 0"); } this.useSignedStringMinMax = useSignedStringMinMax; this.statisticsTruncateLength = statisticsTruncateLength; + this.readInt96Stats = readInt96Stats; } // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate @@ -601,7 +607,7 @@ private void addRowGroup( if (columnMetaData.getStatistics() != null && !columnMetaData.getStatistics().isEmpty()) { metaData.setStatistics( - toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength)); + toParquetStatistics(parquetMetadata.getFileMetaData().getCreatedBy(), columnMetaData.getStatistics(), this.statisticsTruncateLength)); } if (columnMetaData.getEncodingStats() != null) { metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); @@ -762,12 +768,12 @@ public List convertEncodingStats(EncodingStats stats) { return formatStats; } - public static Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics stats) { - return toParquetStatistics(stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + public Statistics toParquetStatistics(String createdBy, org.apache.parquet.column.statistics.Statistics stats) { + return toParquetStatistics(createdBy, stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); } - public static Statistics toParquetStatistics( - org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { + public Statistics toParquetStatistics( + String createdBy, org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { Statistics formatStats = new Statistics(); // Don't write stats larger than the max size rather than truncating. The // rationale is that some engines may use the minimum value in the page as @@ -795,7 +801,7 @@ public static Statistics toParquetStatistics( formatStats.setMax(max); } - if (isMinMaxStatsSupported(stats.type()) || Arrays.equals(min, max)) { + if (isMinMaxStatsReadingSupported(createdBy, stats.type()) || Arrays.equals(min, max)) { formatStats.setMin_value(min); formatStats.setMax_value(max); } @@ -859,10 +865,17 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, .getBytes(); } - private static boolean isMinMaxStatsSupported(PrimitiveType type) { + private static boolean isMinMaxStatsWritingSupported(PrimitiveType type) { return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; } + private boolean isMinMaxStatsReadingSupported(String createdBy, PrimitiveType type) { + if (type.getPrimitiveTypeName() == PrimitiveTypeName.INT96) { + return readInt96Stats && ValidInt96Stats.hasValidInt96Stats(createdBy); + } + return isMinMaxStatsWritingSupported(type); + } + /** * @param statistics parquet format statistics * @param type a primitive type name @@ -870,7 +883,7 @@ private static boolean isMinMaxStatsSupported(PrimitiveType type) { * @deprecated will be removed in 2.0.0. */ @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( Statistics statistics, PrimitiveTypeName type) { return fromParquetStatistics(null, statistics, type); } @@ -883,7 +896,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist * @deprecated will be removed in 2.0.0. */ @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( String createdBy, Statistics statistics, PrimitiveTypeName type) { return fromParquetStatisticsInternal( createdBy, @@ -893,7 +906,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist } // Visible for testing - static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( + org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) { // create stats object based on the column type org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = @@ -904,7 +917,7 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte if (formatStats.isSetMin_value() && formatStats.isSetMax_value()) { byte[] min = formatStats.min_value.array(); byte[] max = formatStats.max_value.array(); - if (isMinMaxStatsSupported(type) || Arrays.equals(min, max)) { + if (isMinMaxStatsReadingSupported(createdBy, type) || Arrays.equals(min, max)) { statsBuilder.withMin(min); statsBuilder.withMax(max); } @@ -1991,8 +2004,7 @@ private void buildChildren( // the types // where ordering is not supported. if (columnOrder.getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER - && (schemaElement.type == Type.INT96 - || schemaElement.converted_type == ConvertedType.INTERVAL)) { + && schemaElement.converted_type == ConvertedType.INTERVAL) { columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); } primitiveBuilder.columnOrder(columnOrder); @@ -2501,7 +2513,7 @@ private static org.apache.parquet.internal.column.columnindex.BoundaryOrder from public static ColumnIndex toParquetColumnIndex( PrimitiveType type, org.apache.parquet.internal.column.columnindex.ColumnIndex columnIndex) { - if (!isMinMaxStatsSupported(type) || columnIndex == null) { + if (!isMinMaxStatsWritingSupported(type) || columnIndex == null) { return null; } ColumnIndex parquetColumnIndex = new ColumnIndex( @@ -2521,9 +2533,9 @@ public static ColumnIndex toParquetColumnIndex( return parquetColumnIndex; } - public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex( - PrimitiveType type, ColumnIndex parquetColumnIndex) { - if (!isMinMaxStatsSupported(type)) { + public org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex( + String createdBy, PrimitiveType type, ColumnIndex parquetColumnIndex) { + if (!isMinMaxStatsReadingSupported(createdBy, type)) { return null; } return ColumnIndexBuilder.build( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index b12a819cdd..612782c91b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1705,8 +1705,8 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOExceptio -1); } } - return ParquetMetadataConverter.fromParquetColumnIndex( - column.getPrimitiveType(), Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); + return converter.fromParquetColumnIndex( + getFileMetaData().getCreatedBy(), column.getPrimitiveType(), Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 2529f06ada..985d2a8f34 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -150,6 +150,7 @@ public class TestParquetMetadataConverter { private static final String CHAR_UPPER = CHAR_LOWER.toUpperCase(); private static final String NUMBER = "0123456789"; private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER; + private static final String CREATED_BY = "parquet-mr"; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -801,9 +802,9 @@ private void testBinaryStats(StatsHelper helper) { Assert.assertFalse("Min_value should not be set", formatStats.isSetMin_value()); Assert.assertFalse("Max_value should not be set", formatStats.isSetMax_value()); Assert.assertFalse("Num nulls should not be set", formatStats.isSetNull_count()); - - Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatisticsInternal( - Version.FULL_VERSION, + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + Statistics roundTripStats = converter.fromParquetStatisticsInternal( + CREATED_BY, formatStats, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, ""), ParquetMetadataConverter.SortOrder.SIGNED); @@ -842,7 +843,7 @@ private void testBinaryStatsWithTruncation(int truncateLen, int minLen, int maxL stats.updateStats(Binary.fromConstantByteArray(min)); stats.updateStats(Binary.fromConstantByteArray(max)); ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(truncateLen); - org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(stats); + org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(CREATED_BY, stats); if (minLen + maxLen >= ParquetMetadataConverter.MAX_STATS_SIZE) { assertNull(formatStats.getMin_value()); @@ -1051,7 +1052,7 @@ private void testStillUseStatsWithSignedSortOrderIfSingleValue(StatsHelper helpe PrimitiveType binaryType = Types.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("b"); Statistics convertedStats = converter.fromParquetStatistics( - Version.FULL_VERSION, ParquetMetadataConverter.toParquetStatistics(stats), binaryType); + Version.FULL_VERSION, converter.toParquetStatistics(CREATED_BY, stats), binaryType); Assert.assertFalse("Stats should not be empty: " + convertedStats, convertedStats.isEmpty()); Assert.assertArrayEquals( @@ -1126,7 +1127,7 @@ public void testMissingValuesFromStats() { PrimitiveType type = Types.required(PrimitiveTypeName.INT32).named("test_int32"); org.apache.parquet.format.Statistics formatStats = new org.apache.parquet.format.Statistics(); - Statistics stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + Statistics stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertFalse(stats.isNumNullsSet()); assertFalse(stats.hasNonNullValue()); assertTrue(stats.isEmpty()); @@ -1135,7 +1136,7 @@ public void testMissingValuesFromStats() { formatStats.clear(); formatStats.setMin(BytesUtils.intToBytes(-100)); formatStats.setMax(BytesUtils.intToBytes(100)); - stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertFalse(stats.isNumNullsSet()); assertTrue(stats.hasNonNullValue()); assertFalse(stats.isEmpty()); @@ -1145,7 +1146,7 @@ public void testMissingValuesFromStats() { formatStats.clear(); formatStats.setNull_count(2000); - stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertTrue(stats.isNumNullsSet()); assertFalse(stats.hasNonNullValue()); assertFalse(stats.isEmpty()); @@ -1169,7 +1170,8 @@ public void testSkippedV2Stats() { private void testSkippedV2Stats(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertFalse(statistics.isSetMin()); assertFalse(statistics.isSetMax()); assertFalse(statistics.isSetMin_value()); @@ -1207,7 +1209,8 @@ public void testV2OnlyStats() { private void testV2OnlyStats(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertFalse(statistics.isSetMin()); assertFalse(statistics.isSetMax()); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min_value); @@ -1249,7 +1252,8 @@ public void testV2StatsEqualMinMax() { private void testV2StatsEqualMinMax(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min); assertEquals(ByteBuffer.wrap(stats.getMaxBytes()), statistics.max); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min_value); @@ -1340,7 +1344,8 @@ private enum StatsHelper { V1() { @Override public org.apache.parquet.format.Statistics toParquetStatistics(Statistics stats) { - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); statistics.unsetMin_value(); statistics.unsetMax_value(); return statistics; @@ -1350,7 +1355,8 @@ public org.apache.parquet.format.Statistics toParquetStatistics(Statistics st V2() { @Override public org.apache.parquet.format.Statistics toParquetStatistics(Statistics stats) { - return ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + return converter.toParquetStatistics(CREATED_BY, stats); } }; @@ -1447,9 +1453,11 @@ public void testColumnIndexConversion() { builder.add( stats, withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(5, 6), LongArrayList.of(2, 1)) : null); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); org.apache.parquet.format.ColumnIndex parquetColumnIndex = - ParquetMetadataConverter.toParquetColumnIndex(type, builder.build()); - ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); + converter.toParquetColumnIndex(type, builder.build()); + + ColumnIndex columnIndex = converter.fromParquetColumnIndex(CREATED_BY, type, parquetColumnIndex); assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); @@ -1463,18 +1471,17 @@ public void testColumnIndexConversion() { ByteBuffer.allocate(0), ByteBuffer.wrap(BytesUtils.longToBytes(500l))) .equals(columnIndex.getMaxValues())); - assertNull( "Should handle null column index", - ParquetMetadataConverter.toParquetColumnIndex( + converter.toParquetColumnIndex( Types.required(PrimitiveTypeName.INT32).named("test_int32"), null)); assertNull( "Should ignore unsupported types", - ParquetMetadataConverter.toParquetColumnIndex( + converter.toParquetColumnIndex( Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); assertNull( "Should ignore unsupported types", - ParquetMetadataConverter.fromParquetColumnIndex( + converter.fromParquetColumnIndex(CREATED_BY, Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .length(12) .as(OriginalType.INTERVAL) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 3126e1746f..6848706475 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -869,10 +869,11 @@ public void testConvertToThriftStatistics() throws Exception { parquetMRstats.updateStats(l); } final String createdBy = "parquet-mr version 1.8.0 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)"; - Statistics thriftStats = - org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats); + org.apache.parquet.format.converter.ParquetMetadataConverter converter = new org.apache.parquet.format.converter.ParquetMetadataConverter(); + org.apache.parquet.format.Statistics thriftStats = + converter.toParquetStatistics(createdBy, parquetMRstats); LongStatistics convertedBackStats = - (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics( + (LongStatistics) converter.fromParquetStatistics( createdBy, thriftStats, PrimitiveTypeName.INT64); assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax()); From 26c3ddd853c006d99433bf90f0209a772a5d7a23 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 17:58:04 +0000 Subject: [PATCH 06/14] Fix more tests --- .../columnindex/TestBinaryTruncator.java | 20 +++++++++++++- .../parquet/schema/TestTypeBuilders.java | 4 +-- .../TestTypeBuildersWithLogicalTypes.java | 3 --- .../parquet/statistics/RandomValues.java | 26 +++++++++++++------ 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java index 8d85f3b84f..cde94cd7f5 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java @@ -32,6 +32,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.CharacterCodingException; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; @@ -93,7 +95,7 @@ public void testContractNonStringTypes() { testTruncator( Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false); testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false); - testTruncator(Types.required(INT96).named("test_int96"), false); + testInt96Truncator(Types.required(INT96).named("test_int96"), false); } @Test @@ -157,6 +159,22 @@ public void testContractStringTypes() { testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true); } + private Binary createInt96Value(long nanoseconds, int julianDay) { + return Binary.fromConstantByteArray( + ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(nanoseconds) + .putInt(julianDay) + .array()); + } + + private void testInt96Truncator(PrimitiveType type, boolean strict) { + BinaryTruncator truncator = BinaryTruncator.getTruncator(type); + Comparator comparator = type.comparator(); + checkContract(truncator, comparator, createInt96Value(0, 2458849), strict, strict); + checkContract(truncator, comparator, createInt96Value(100, 128849), strict, strict); + } + private void testTruncator(PrimitiveType type, boolean strict) { BinaryTruncator truncator = BinaryTruncator.getTruncator(type); Comparator comparator = type.comparator(); diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java index 018ce5b276..a78720e83a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java @@ -1334,7 +1334,7 @@ public void testTypeConstructionWithUndefinedColumnOrder() { @Test public void testTypeConstructionWithTypeDefinedColumnOrder() { PrimitiveTypeName[] types = - new PrimitiveTypeName[] {BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BINARY, FIXED_LEN_BYTE_ARRAY}; + new PrimitiveTypeName[] {BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, FIXED_LEN_BYTE_ARRAY}; for (PrimitiveTypeName type : types) { String name = type.toString() + "_"; int len = type == FIXED_LEN_BYTE_ARRAY ? 42 : 0; @@ -1350,8 +1350,6 @@ public void testTypeConstructionWithTypeDefinedColumnOrder() { @Test public void testTypeConstructionWithUnsupportedColumnOrder() { - assertThrows(null, IllegalArgumentException.class, (Callable) () -> - Types.optional(INT96).columnOrder(ColumnOrder.typeDefined()).named("int96_unsupported")); assertThrows(null, IllegalArgumentException.class, (Callable) () -> Types.optional(FIXED_LEN_BYTE_ARRAY) .length(12) diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java index 61fe3065e1..8e1e50f6eb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java @@ -349,9 +349,6 @@ public void testIntervalAnnotationRejectsNonFixed12() { @Test public void testTypeConstructionWithUnsupportedColumnOrder() { - assertThrows(null, IllegalArgumentException.class, () -> Types.optional(INT96) - .columnOrder(ColumnOrder.typeDefined()) - .named("int96_unsupported")); assertThrows(null, IllegalArgumentException.class, () -> Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .length(12) .as(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance()) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index f2cf26d868..92db9f1481 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -20,6 +20,8 @@ package org.apache.parquet.statistics; import java.math.BigInteger; +import java.nio.ByteOrder; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; @@ -213,11 +215,11 @@ public Long nextValue() { } } - public static class Int96Generator extends RandomBinaryBase { - private final RandomRange randomRange = new RandomRange(randomInt96(), randomInt96()); - private final BigInteger minimum = randomRange.minimum(); - private final BigInteger maximum = randomRange.maximum(); - private final BigInteger range = maximum.subtract(minimum); + public static class Int96Generator extends RandomBinaryBase { + private final RandomRange randomRangeJulianDay = new RandomRange<>(randomInt(), randomInt()); + private final int minimumJulianDay = randomRangeJulianDay.minimum(); + private final int maximumJulianDay = randomRangeJulianDay.maximum(); + private final int rangeJulianDay = (maximumJulianDay - minimumJulianDay); private static final int INT_96_LENGTH = 12; @@ -226,13 +228,21 @@ public Int96Generator(long seed) { } @Override - public BigInteger nextValue() { - return (minimum.add(randomInt96(range))); + public Binary nextValue() { + long timeOfDay = randomLong(); + int julianDay = minimumJulianDay + randomPositiveInt(rangeJulianDay); + + ByteBuffer.wrap(buffer) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(timeOfDay) + .putInt(julianDay); + + return Binary.fromReusedByteArray(buffer, 0, INT_96_LENGTH); } @Override public Binary nextBinaryValue() { - return FixedBinaryTestUtils.getFixedBinary(INT_96_LENGTH, nextValue()); + return nextValue(); } } From d65b7b826da2b525fb9f926fd5b78ed8261f7f50 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Wed, 11 Jun 2025 18:03:19 +0000 Subject: [PATCH 07/14] Formatting --- .../org/apache/parquet/ValidInt96Stats.java | 2 -- .../parquet/schema/PrimitiveComparator.java | 2 +- .../apache/parquet/schema/PrimitiveType.java | 5 +-- .../columnindex/TestBinaryTruncator.java | 11 +++---- .../schema/TestPrimitiveComparator.java | 26 +++++++-------- .../apache/parquet/ParquetReadOptions.java | 2 +- .../converter/ParquetMetadataConverter.java | 18 +++++++---- .../parquet/hadoop/ParquetFileReader.java | 4 ++- .../TestParquetMetadataConverter.java | 7 ++-- .../parquet/hadoop/TestParquetFileWriter.java | 10 +++--- .../parquet/statistics/RandomValues.java | 3 +- ...TestInt96TimestampStatisticsRoundTrip.java | 32 +++++++------------ 12 files changed, 56 insertions(+), 66 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java index 9ce963493d..9d4235bcca 100644 --- a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java +++ b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java @@ -19,7 +19,6 @@ package org.apache.parquet; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.parquet.SemanticVersion.SemanticVersionParseException; import org.apache.parquet.VersionParser.ParsedVersion; import org.apache.parquet.VersionParser.VersionParseException; import org.slf4j.Logger; @@ -35,7 +34,6 @@ public class ValidInt96Stats { private static final Logger LOG = LoggerFactory.getLogger(ValidInt96Stats.class); - /** * Decides if the statistics from a file created by createdBy (the created_by field from parquet format) * should be trusted for INT96 columns. diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 76312f9b6c..e7cedba8e6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -206,7 +206,7 @@ public String toString() { } }; -/* + /* * This comparator is for comparing two timestamps represented as int96 binary. */ static final PrimitiveComparator BINARY_AS_INT_96_COMPARATOR = new BinaryComparator() { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 81a8151ad1..940e731a1d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -35,7 +35,6 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; /** @@ -542,9 +541,7 @@ public PrimitiveType( this.decimalMeta = decimalMeta; if (columnOrder == null) { - columnOrder = originalType == OriginalType.INTERVAL - ? ColumnOrder.undefined() - : ColumnOrder.typeDefined(); + columnOrder = originalType == OriginalType.INTERVAL ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); } this.columnOrder = requireValidColumnOrder(columnOrder); } diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java index cde94cd7f5..36f1aa0564 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java @@ -160,12 +160,11 @@ public void testContractStringTypes() { } private Binary createInt96Value(long nanoseconds, int julianDay) { - return Binary.fromConstantByteArray( - ByteBuffer.allocate(12) - .order(ByteOrder.LITTLE_ENDIAN) - .putLong(nanoseconds) - .putInt(julianDay) - .array()); + return Binary.fromConstantByteArray(ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(nanoseconds) + .putInt(julianDay) + .array()); } private void testInt96Truncator(PrimitiveType type, boolean strict) { diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index e2a47a82ea..3175165ec1 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -19,8 +19,8 @@ package org.apache.parquet.schema; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; -import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT_96_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.FLOAT_COMPARATOR; @@ -37,8 +37,8 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import org.apache.parquet.io.api.Binary; import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.io.api.Binary; import org.junit.Test; /* @@ -281,22 +281,22 @@ private Binary timestampToInt96(String timestamp) { LocalDateTime dt = LocalDateTime.parse(timestamp); long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day long nanos = dt.toLocalTime().toNanoOfDay(); - return new NanoTime((int)julianDay, nanos).toBinary(); + return new NanoTime((int) julianDay, nanos).toBinary(); } @Test public void testInt96Comparator() { Binary[] valuesInAscendingOrder = { - timestampToInt96("2020-01-01T00:00:00.000"), - timestampToInt96("2020-02-29T23:59:59.999"), - timestampToInt96("2020-12-31T23:59:59.999"), - timestampToInt96("2021-01-01T00:00:00.000"), - timestampToInt96("2023-06-15T12:30:45.500"), - timestampToInt96("2024-02-29T15:45:30.750"), - timestampToInt96("2024-12-25T07:00:00.000"), - timestampToInt96("2025-01-01T00:00:00.000"), - timestampToInt96("2025-07-04T20:00:00.000"), - timestampToInt96("2025-12-31T23:59:59.999") + timestampToInt96("2020-01-01T00:00:00.000"), + timestampToInt96("2020-02-29T23:59:59.999"), + timestampToInt96("2020-12-31T23:59:59.999"), + timestampToInt96("2021-01-01T00:00:00.000"), + timestampToInt96("2023-06-15T12:30:45.500"), + timestampToInt96("2024-02-29T15:45:30.750"), + timestampToInt96("2024-12-25T07:00:00.000"), + timestampToInt96("2025-01-01T00:00:00.000"), + timestampToInt96("2025-07-04T20:00:00.000"), + timestampToInt96("2025-12-31T23:59:59.999") }; for (int i = 0; i < valuesInAscendingOrder.length; ++i) { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index c4945eadb9..58dc1f4349 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -28,8 +28,8 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; -import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 2c458e8aa2..6cd1fd6072 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -24,8 +24,8 @@ import static org.apache.parquet.format.Util.readFileMetaData; import static org.apache.parquet.format.Util.writeColumnMetaData; import static org.apache.parquet.format.Util.writePageHeader; -import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DEFAULT_READ_INT96_STATS_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -173,8 +173,9 @@ public ParquetMetadataConverter(int statisticsTruncateLength) { */ @Deprecated public ParquetMetadataConverter(Configuration conf) { - this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false), conf.getBoolean(READ_INT96_STATS_ENABLED, - DEFAULT_READ_INT96_STATS_ENABLED)); + this( + conf.getBoolean("parquet.strings.signed-min-max.enabled", false), + conf.getBoolean(READ_INT96_STATS_ENABLED, DEFAULT_READ_INT96_STATS_ENABLED)); } public ParquetMetadataConverter(ParquetReadOptions options) { @@ -185,7 +186,8 @@ private ParquetMetadataConverter(boolean useSignedStringMinMax, boolean readInt9 this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, readInt96Stats); } - private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength, boolean readInt96Stats) { + private ParquetMetadataConverter( + boolean useSignedStringMinMax, int statisticsTruncateLength, boolean readInt96Stats) { if (statisticsTruncateLength <= 0) { throw new IllegalArgumentException("Truncate length should be greater than 0"); } @@ -606,8 +608,10 @@ private void addRowGroup( } if (columnMetaData.getStatistics() != null && !columnMetaData.getStatistics().isEmpty()) { - metaData.setStatistics( - toParquetStatistics(parquetMetadata.getFileMetaData().getCreatedBy(), columnMetaData.getStatistics(), this.statisticsTruncateLength)); + metaData.setStatistics(toParquetStatistics( + parquetMetadata.getFileMetaData().getCreatedBy(), + columnMetaData.getStatistics(), + this.statisticsTruncateLength)); } if (columnMetaData.getEncodingStats() != null) { metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); @@ -874,7 +878,7 @@ private boolean isMinMaxStatsReadingSupported(String createdBy, PrimitiveType ty return readInt96Stats && ValidInt96Stats.hasValidInt96Stats(createdBy); } return isMinMaxStatsWritingSupported(type); - } + } /** * @param statistics parquet format statistics diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 612782c91b..50cac92f8a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1706,7 +1706,9 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOExceptio } } return converter.fromParquetColumnIndex( - getFileMetaData().getCreatedBy(), column.getPrimitiveType(), Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); + getFileMetaData().getCreatedBy(), + column.getPrimitiveType(), + Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 985d2a8f34..52e4c92035 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -1455,8 +1455,8 @@ public void testColumnIndexConversion() { withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(5, 6), LongArrayList.of(2, 1)) : null); ParquetMetadataConverter converter = new ParquetMetadataConverter(); org.apache.parquet.format.ColumnIndex parquetColumnIndex = - converter.toParquetColumnIndex(type, builder.build()); - + converter.toParquetColumnIndex(type, builder.build()); + ColumnIndex columnIndex = converter.fromParquetColumnIndex(CREATED_BY, type, parquetColumnIndex); assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); @@ -1481,7 +1481,8 @@ public void testColumnIndexConversion() { Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); assertNull( "Should ignore unsupported types", - converter.fromParquetColumnIndex(CREATED_BY, + converter.fromParquetColumnIndex( + CREATED_BY, Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .length(12) .as(OriginalType.INTERVAL) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 6848706475..965b6e017d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -82,7 +82,6 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; -import org.apache.parquet.format.Statistics; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -869,12 +868,11 @@ public void testConvertToThriftStatistics() throws Exception { parquetMRstats.updateStats(l); } final String createdBy = "parquet-mr version 1.8.0 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)"; - org.apache.parquet.format.converter.ParquetMetadataConverter converter = new org.apache.parquet.format.converter.ParquetMetadataConverter(); - org.apache.parquet.format.Statistics thriftStats = - converter.toParquetStatistics(createdBy, parquetMRstats); + org.apache.parquet.format.converter.ParquetMetadataConverter converter = + new org.apache.parquet.format.converter.ParquetMetadataConverter(); + org.apache.parquet.format.Statistics thriftStats = converter.toParquetStatistics(createdBy, parquetMRstats); LongStatistics convertedBackStats = - (LongStatistics) converter.fromParquetStatistics( - createdBy, thriftStats, PrimitiveTypeName.INT64); + (LongStatistics) converter.fromParquetStatistics(createdBy, thriftStats, PrimitiveTypeName.INT64); assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax()); assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index 92db9f1481..20889f9664 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -20,15 +20,14 @@ package org.apache.parquet.statistics; import java.math.BigInteger; -import java.nio.ByteOrder; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.function.Supplier; -import org.apache.parquet.FixedBinaryTestUtils; import org.apache.parquet.io.api.Binary; public class RandomValues { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java index 19e8fb8bd6..13284da5d7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java @@ -24,18 +24,18 @@ import java.io.IOException; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.NanoTime; import org.apache.parquet.example.data.simple.SimpleGroup; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; @@ -50,9 +50,7 @@ public class TestInt96TimestampStatisticsRoundTrip { public final TemporaryFolder temp = new TemporaryFolder(); private MessageType createSchema() { - return Types.buildMessage() - .required(INT96).named("timestamp_field") - .named("root"); + return Types.buildMessage().required(INT96).named("timestamp_field").named("root"); } /** @@ -65,7 +63,7 @@ private Binary timestampToInt96(String timestamp) { LocalDateTime dt = LocalDateTime.parse(timestamp); long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day long nanos = dt.toLocalTime().toNanoOfDay(); - return new NanoTime((int)julianDay, nanos).toBinary(); + return new NanoTime((int) julianDay, nanos).toBinary(); } private void writeParquetFile(Path file, List timestampValues) throws IOException { @@ -83,13 +81,15 @@ private void writeParquetFile(Path file, List timestampValues) throws IO } } - private void verifyStatistics(Path file, Binary minValue, Binary maxValue, boolean readInt96Stats) throws IOException { + private void verifyStatistics(Path file, Binary minValue, Binary maxValue, boolean readInt96Stats) + throws IOException { Configuration conf = new Configuration(); conf.set("parquet.read.int96stats.enabled", readInt96Stats ? "true" : "false"); ParquetMetadata metadata = ParquetFileReader.readFooter(conf, file); - + // Verify INT96 statistics - ColumnChunkMetaData timestampColumn = metadata.getBlocks().get(0).getColumns().get(0); + ColumnChunkMetaData timestampColumn = + metadata.getBlocks().get(0).getColumns().get(0); Statistics timestampStats = timestampColumn.getStatistics(); if (readInt96Stats) { @@ -107,7 +107,7 @@ private void runTimestampTest(String[] timestamps) throws IOException { Binary maxValue = timestampToInt96(timestamps[timestamps.length - 1]); List timestampValues = new ArrayList<>(); for (String timestamp : timestamps) { - timestampValues.add(timestampToInt96(timestamp)); + timestampValues.add(timestampToInt96(timestamp)); } Collections.shuffle(timestampValues); @@ -136,21 +136,13 @@ public void testMultipleDates() throws IOException { @Test public void testSameDayDifferentTime() throws IOException { - String[] timestamps = { - "2020-01-01T00:01:00.000", - "2020-01-01T00:02:00.000", - "2020-01-01T00:03:00.000" - }; + String[] timestamps = {"2020-01-01T00:01:00.000", "2020-01-01T00:02:00.000", "2020-01-01T00:03:00.000"}; runTimestampTest(timestamps); } @Test public void testIncreasingDayDecreasingTime() throws IOException { - String[] timestamps = { - "2020-01-01T12:00:00.000", - "2020-02-01T11:00:00.000", - "2020-03-01T10:00:00.000" - }; + String[] timestamps = {"2020-01-01T12:00:00.000", "2020-02-01T11:00:00.000", "2020-03-01T10:00:00.000"}; runTimestampTest(timestamps); } } From 21bf8ae163e960441ea77ed6dd729983e217fc3a Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Thu, 12 Jun 2025 09:43:32 +0000 Subject: [PATCH 08/14] Fix the comparator --- .../java/org/apache/parquet/schema/PrimitiveComparator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index e7cedba8e6..9940d43242 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -221,7 +221,7 @@ int compareBinary(Binary b1, Binary b2) { if (jd1 != jd2) return Integer.compareUnsigned(jd1, jd2) < 0 ? -1 : 1; long s1 = bb1.getLong(0); long s2 = bb2.getLong(0); - if (s1 != s2) return Long.compareUnsigned(s1, s2) < 0 ? 1 : 1; + if (s1 != s2) return Long.compareUnsigned(s1, s2) < 0 ? -1 : 1; return 0; } From db9231c2cfea8e85a46010602a738ea5f7164caf Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Mon, 30 Jun 2025 15:50:02 +0000 Subject: [PATCH 09/14] BINARY_AS_INT_96_COMPARATOR -> BINARY_AS_INT96_TIMESTAMP_COMPARATOR --- .../java/org/apache/parquet/schema/PrimitiveComparator.java | 4 ++-- .../main/java/org/apache/parquet/schema/PrimitiveType.java | 2 +- .../org/apache/parquet/schema/TestPrimitiveComparator.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 9940d43242..b37408a59d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -209,7 +209,7 @@ public String toString() { /* * This comparator is for comparing two timestamps represented as int96 binary. */ - static final PrimitiveComparator BINARY_AS_INT_96_COMPARATOR = new BinaryComparator() { + static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { @Override int compareBinary(Binary b1, Binary b2) { ByteBuffer bb1 = b1.toByteBuffer(); @@ -227,7 +227,7 @@ int compareBinary(Binary b1, Binary b2) { @Override public String toString() { - return "BINARY_AS_INT_96_COMPARATOR"; + return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR"; } }; diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 940e731a1d..3b11a9ffac 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -362,7 +362,7 @@ public T convert(PrimitiveTypeNameConverter conve @Override PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { - return PrimitiveComparator.BINARY_AS_INT_96_COMPARATOR; + return PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; } }, FIXED_LEN_BYTE_ARRAY("getBinary", Binary.class) { diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index 3175165ec1..027f350f3f 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -19,7 +19,7 @@ package org.apache.parquet.schema; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; -import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT_96_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; @@ -302,7 +302,7 @@ public void testInt96Comparator() { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { Binary bi = valuesInAscendingOrder[i]; Binary bj = valuesInAscendingOrder[j]; - assertEquals(Integer.compare(i, j), BINARY_AS_INT_96_COMPARATOR.compare(bi, bj)); + assertEquals(Integer.compare(i, j), BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj)); } } } From 85f3c3cf384ec6d09ce1ec1708edb888a41fa8df Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Mon, 30 Jun 2025 16:08:34 +0000 Subject: [PATCH 10/14] Add clarification on the comparator --- .../java/org/apache/parquet/schema/PrimitiveComparator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index b37408a59d..e5b624324b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -208,6 +208,9 @@ public String toString() { /* * This comparator is for comparing two timestamps represented as int96 binary. + * It is a two level comparison. + * Days (last 4 bytes compared as unsigned Little endian int32), + * Nanoseconds (first 8 bytes compared as unsigned little endian int64) */ static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { @Override From 5eef533e5e42a34df40fbd91ec29288bd43eb260 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Mon, 30 Jun 2025 16:16:40 +0000 Subject: [PATCH 11/14] Check parquet-mr version --- .../org/apache/parquet/ValidInt96Stats.java | 2 +- .../apache/parquet/ValidInt96StatsTest.java | 23 +++++++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java index 9d4235bcca..c84dce1a79 100644 --- a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java +++ b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java @@ -50,7 +50,7 @@ public static boolean hasValidInt96Stats(String createdBy) { try { ParsedVersion version = VersionParser.parse(createdBy); if ("parquet-mr".equals(version.application)) { - return true; + return version.version != null && version.version.compareTo("1.15.0") > 0; } if ("parquet-mr compatible Photon".equals(version.application)) { return true; diff --git a/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java index edb3168af0..f190affad4 100644 --- a/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java @@ -33,11 +33,24 @@ public void testNullAndEmpty() { @Test public void testParquetMrValid() { - assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3")); - assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3 (build abcd)")); - assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3-SNAPSHOT")); - assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1")); - assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1-SNAPSHOT")); + // Versions > 1.15.0 should be valid + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.16.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.1")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 2.0.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.16.0 (build abcd)")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.1-SNAPSHOT")); + } + + @Test + public void testParquetMrInvalid() { + // Versions <= 1.15.0 should be invalid + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.14.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3 (build abcd)")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3-SNAPSHOT")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1-SNAPSHOT")); } @Test From efacf01f163477a1f36262bc00d9d1459785c61d Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Tue, 12 Aug 2025 17:22:46 +0000 Subject: [PATCH 12/14] Add slicing to fix bug with the comparator --- .../parquet/schema/PrimitiveComparator.java | 4 +- .../schema/TestPrimitiveComparator.java | 47 ++++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index e5b624324b..9c78606ef5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -215,8 +215,8 @@ public String toString() { static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { @Override int compareBinary(Binary b1, Binary b2) { - ByteBuffer bb1 = b1.toByteBuffer(); - ByteBuffer bb2 = b2.toByteBuffer(); + ByteBuffer bb1 = b1.toByteBuffer().slice(); + ByteBuffer bb2 = b2.toByteBuffer().slice(); bb1.order(java.nio.ByteOrder.LITTLE_ENDIAN); bb2.order(java.nio.ByteOrder.LITTLE_ENDIAN); int jd1 = bb1.getInt(8); diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index 027f350f3f..c508ff2afb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -287,22 +287,47 @@ private Binary timestampToInt96(String timestamp) { @Test public void testInt96Comparator() { Binary[] valuesInAscendingOrder = { - timestampToInt96("2020-01-01T00:00:00.000"), - timestampToInt96("2020-02-29T23:59:59.999"), - timestampToInt96("2020-12-31T23:59:59.999"), - timestampToInt96("2021-01-01T00:00:00.000"), - timestampToInt96("2023-06-15T12:30:45.500"), - timestampToInt96("2024-02-29T15:45:30.750"), - timestampToInt96("2024-12-25T07:00:00.000"), - timestampToInt96("2025-01-01T00:00:00.000"), - timestampToInt96("2025-07-04T20:00:00.000"), - timestampToInt96("2025-12-31T23:59:59.999") + timestampToInt96("2020-01-01T00:00:00.000"), + timestampToInt96("2020-01-01T10:00:00.000"), + timestampToInt96("2020-02-29T23:59:59.999"), + timestampToInt96("2020-12-31T23:59:59.999"), + timestampToInt96("2021-01-01T00:00:00.000"), + timestampToInt96("2023-06-15T12:30:45.500"), + timestampToInt96("2024-02-29T15:45:30.750"), + timestampToInt96("2024-12-25T07:00:00.000"), + timestampToInt96("2025-01-01T00:00:00.000"), + timestampToInt96("2025-07-04T20:00:00.000"), + timestampToInt96("2025-07-04T20:50:00.000"), + timestampToInt96("2025-12-31T23:59:59.999") }; + + java.util.function.Function[] perturb = new java.util.function.Function[] { + (java.util.function.Function) b -> b, + (java.util.function.Function) b -> Binary.fromReusedByteArray(b.getBytes()), + (java.util.function.Function) b -> Binary.fromConstantByteArray(b.getBytes()), + (java.util.function.Function) b -> { + byte[] originalBytes = b.getBytes(); + byte[] paddedBuffer = new byte[originalBytes.length + 20]; + int offset = 10; + for (int i = 0; i < paddedBuffer.length; i++) { + paddedBuffer[i] = (byte) (0xAA + (i % 5)); + } + System.arraycopy(originalBytes, 0, paddedBuffer, offset, originalBytes.length); + return Binary.fromReusedByteArray(paddedBuffer, offset, originalBytes.length); + } + }; + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { Binary bi = valuesInAscendingOrder[i]; Binary bj = valuesInAscendingOrder[j]; - assertEquals(Integer.compare(i, j), BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj)); + for (java.util.function.Function fi : perturb) { + for (java.util.function.Function fj : perturb) { + Binary perturbedBi = fi.apply(bi); + Binary perturbedBj = fj.apply(bj); + assertEquals(Integer.compare(i, j), BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(perturbedBi, perturbedBj)); + } + } } } } From 2df49cb085e2fec36dedf5c19c4503aa74ed3126 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Tue, 9 Sep 2025 14:51:33 +0200 Subject: [PATCH 13/14] Update parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java Co-authored-by: Gang Wu --- .../java/org/apache/parquet/schema/PrimitiveComparator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 9c78606ef5..01403bc9be 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -209,7 +209,7 @@ public String toString() { /* * This comparator is for comparing two timestamps represented as int96 binary. * It is a two level comparison. - * Days (last 4 bytes compared as unsigned Little endian int32), + * Days (last 4 bytes compared as unsigned little endian int32), * Nanoseconds (first 8 bytes compared as unsigned little endian int64) */ static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { From 90656f1d4d9a1a9a41427362ca874b41adbbef58 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Tue, 9 Sep 2025 14:51:45 +0200 Subject: [PATCH 14/14] Update parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java Co-authored-by: Gang Wu --- .../src/main/java/org/apache/parquet/ValidInt96Stats.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java index c84dce1a79..8889ad0c4f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java +++ b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java @@ -50,7 +50,7 @@ public static boolean hasValidInt96Stats(String createdBy) { try { ParsedVersion version = VersionParser.parse(createdBy); if ("parquet-mr".equals(version.application)) { - return version.version != null && version.version.compareTo("1.15.0") > 0; + return version.version != null && version.version.compareTo("1.16.0") > 0; } if ("parquet-mr compatible Photon".equals(version.application)) { return true;