diff --git a/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java new file mode 100644 index 0000000000..8889ad0c4f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/ValidInt96Stats.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.VersionParser.VersionParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Not all parquet writers populate the int96 statistics correctly. For example: arrow-rs + * https://github.com/apache/arrow-rs/blob/3ed9aedabc9e5a90170e43ff818f24a29eafb35b/parquet/src/file/statistics.rs#L212-L215 + * This class is used to detect whether a file was written with a version that has correct int96 statistics. + */ +public class ValidInt96Stats { + private static final AtomicBoolean alreadyLogged = new AtomicBoolean(false); + + private static final Logger LOG = LoggerFactory.getLogger(ValidInt96Stats.class); + + /** + * Decides if the statistics from a file created by createdBy (the created_by field from parquet format) + * should be trusted for INT96 columns. + * + * @param createdBy the created-by string from a file footer + * @return true if the statistics are valid and can be trusted, false otherwise + */ + public static boolean hasValidInt96Stats(String createdBy) { + if (Strings.isNullOrEmpty(createdBy)) { + warnOnce("Cannot verify INT96 statistics because created_by is null or empty"); + return false; + } + + try { + ParsedVersion version = VersionParser.parse(createdBy); + if ("parquet-mr".equals(version.application)) { + return version.version != null && version.version.compareTo("1.16.0") > 0; + } + if ("parquet-mr compatible Photon".equals(version.application)) { + return true; + } + } catch (RuntimeException | VersionParseException e) { + warnParseErrorOnce(createdBy, e); + } + return false; + } + + private static void warnParseErrorOnce(String createdBy, Throwable e) { + if (!alreadyLogged.getAndSet(true)) { + LOG.warn("Cannot verify INT96 statistics because created_by could not be parsed: " + createdBy, e); + } + } + + private static void warnOnce(String message) { + if (!alreadyLogged.getAndSet(true)) { + LOG.warn(message); + } + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 50c4acd4c9..01403bc9be 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -206,6 +206,34 @@ public String toString() { } }; + /* + * This comparator is for comparing two timestamps represented as int96 binary. + * It is a two level comparison. + * Days (last 4 bytes compared as unsigned little endian int32), + * Nanoseconds (first 8 bytes compared as unsigned little endian int64) + */ + static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { + @Override + int compareBinary(Binary b1, Binary b2) { + ByteBuffer bb1 = b1.toByteBuffer().slice(); + ByteBuffer bb2 = b2.toByteBuffer().slice(); + bb1.order(java.nio.ByteOrder.LITTLE_ENDIAN); + bb2.order(java.nio.ByteOrder.LITTLE_ENDIAN); + int jd1 = bb1.getInt(8); + int jd2 = bb2.getInt(8); + if (jd1 != jd2) return Integer.compareUnsigned(jd1, jd2) < 0 ? -1 : 1; + long s1 = bb1.getLong(0); + long s2 = bb2.getLong(0); + if (s1 != s2) return Long.compareUnsigned(s1, s2) < 0 ? -1 : 1; + return 0; + } + + @Override + public String toString() { + return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR"; + } + }; + /* * This comparator is for comparing two signed decimal values represented in twos-complement binary. In case of the * binary length of one value is shorter than the other it will be padded by the corresponding prefix (0xFF for diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 6beff4da93..3b11a9ffac 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -35,7 +35,6 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; /** @@ -363,7 +362,7 @@ public T convert(PrimitiveTypeNameConverter conve @Override PrimitiveComparator comparator(LogicalTypeAnnotation logicalType) { - return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; + return PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; } }, FIXED_LEN_BYTE_ARRAY("getBinary", Binary.class) { @@ -542,9 +541,7 @@ public PrimitiveType( this.decimalMeta = decimalMeta; if (columnOrder == null) { - columnOrder = primitive == PrimitiveTypeName.INT96 || originalType == OriginalType.INTERVAL - ? ColumnOrder.undefined() - : ColumnOrder.typeDefined(); + columnOrder = originalType == OriginalType.INTERVAL ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); } this.columnOrder = requireValidColumnOrder(columnOrder); } @@ -587,8 +584,7 @@ public PrimitiveType( } if (columnOrder == null) { - columnOrder = primitive == PrimitiveTypeName.INT96 - || logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation + columnOrder = logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation ? ColumnOrder.undefined() : ColumnOrder.typeDefined(); } @@ -596,12 +592,6 @@ public PrimitiveType( } private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) { - if (primitive == PrimitiveTypeName.INT96) { - Preconditions.checkArgument( - columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED, - "The column order %s is not supported by INT96", - columnOrder); - } if (getLogicalTypeAnnotation() != null) { Preconditions.checkArgument( getLogicalTypeAnnotation().isValidColumnOrder(columnOrder), diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index fd82d36768..0214eab259 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -413,8 +413,8 @@ public THIS scale(int scale) { /** * Adds the column order for the primitive type. *

- * In case of not set the default column order is {@link ColumnOrderName#TYPE_DEFINED_ORDER} except the type - * {@link PrimitiveTypeName#INT96} and the types annotated by {@link OriginalType#INTERVAL} where the default column + * In case of not set the default column order is {@link ColumnOrderName#TYPE_DEFINED_ORDER} except the types + * annotated by {@link OriginalType#INTERVAL} where the default column * order is {@link ColumnOrderName#UNDEFINED}. * * @param columnOrder the column order for the primitive type diff --git a/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java new file mode 100644 index 0000000000..f190affad4 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/ValidInt96StatsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class ValidInt96StatsTest { + + @Test + public void testNullAndEmpty() { + assertFalse(ValidInt96Stats.hasValidInt96Stats(null)); + assertFalse(ValidInt96Stats.hasValidInt96Stats("")); + } + + @Test + public void testParquetMrValid() { + // Versions > 1.15.0 should be valid + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.16.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.1")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 2.0.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.16.0 (build abcd)")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.1-SNAPSHOT")); + } + + @Test + public void testParquetMrInvalid() { + // Versions <= 1.15.0 should be invalid + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.15.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.14.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3 (build abcd)")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3-SNAPSHOT")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("parquet-mr version 1.12.3rc1-SNAPSHOT")); + } + + @Test + public void testParquetMrCompatiblePhotonValid() { + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0 (build abcd)")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0-SNAPSHOT")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0rc1")); + assertTrue(ValidInt96Stats.hasValidInt96Stats("parquet-mr compatible Photon version 1.0.0rc1-SNAPSHOT")); + } + + @Test + public void testInvalidApplications() { + assertFalse(ValidInt96Stats.hasValidInt96Stats("arrow-rs version 0.1.0")); + assertFalse(ValidInt96Stats.hasValidInt96Stats("impala version 1.6.0")); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java index 8d85f3b84f..36f1aa0564 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java @@ -32,6 +32,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.CharacterCodingException; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; @@ -93,7 +95,7 @@ public void testContractNonStringTypes() { testTruncator( Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false); testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false); - testTruncator(Types.required(INT96).named("test_int96"), false); + testInt96Truncator(Types.required(INT96).named("test_int96"), false); } @Test @@ -157,6 +159,21 @@ public void testContractStringTypes() { testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true); } + private Binary createInt96Value(long nanoseconds, int julianDay) { + return Binary.fromConstantByteArray(ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(nanoseconds) + .putInt(julianDay) + .array()); + } + + private void testInt96Truncator(PrimitiveType type, boolean strict) { + BinaryTruncator truncator = BinaryTruncator.getTruncator(type); + Comparator comparator = type.comparator(); + checkContract(truncator, comparator, createInt96Value(0, 2458849), strict, strict); + checkContract(truncator, comparator, createInt96Value(100, 128849), strict, strict); + } + private void testTruncator(PrimitiveType type, boolean strict) { BinaryTruncator truncator = BinaryTruncator.getTruncator(type); Comparator comparator = type.comparator(); diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index d3d1b15bc6..c508ff2afb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -19,6 +19,7 @@ package org.apache.parquet.schema; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; @@ -33,8 +34,10 @@ import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import org.apache.parquet.example.data.simple.NanoTime; import org.apache.parquet.io.api.Binary; import org.junit.Test; @@ -274,6 +277,61 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() { } } + private Binary timestampToInt96(String timestamp) { + LocalDateTime dt = LocalDateTime.parse(timestamp); + long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day + long nanos = dt.toLocalTime().toNanoOfDay(); + return new NanoTime((int) julianDay, nanos).toBinary(); + } + + @Test + public void testInt96Comparator() { + Binary[] valuesInAscendingOrder = { + timestampToInt96("2020-01-01T00:00:00.000"), + timestampToInt96("2020-01-01T10:00:00.000"), + timestampToInt96("2020-02-29T23:59:59.999"), + timestampToInt96("2020-12-31T23:59:59.999"), + timestampToInt96("2021-01-01T00:00:00.000"), + timestampToInt96("2023-06-15T12:30:45.500"), + timestampToInt96("2024-02-29T15:45:30.750"), + timestampToInt96("2024-12-25T07:00:00.000"), + timestampToInt96("2025-01-01T00:00:00.000"), + timestampToInt96("2025-07-04T20:00:00.000"), + timestampToInt96("2025-07-04T20:50:00.000"), + timestampToInt96("2025-12-31T23:59:59.999") + }; + + java.util.function.Function[] perturb = new java.util.function.Function[] { + (java.util.function.Function) b -> b, + (java.util.function.Function) b -> Binary.fromReusedByteArray(b.getBytes()), + (java.util.function.Function) b -> Binary.fromConstantByteArray(b.getBytes()), + (java.util.function.Function) b -> { + byte[] originalBytes = b.getBytes(); + byte[] paddedBuffer = new byte[originalBytes.length + 20]; + int offset = 10; + for (int i = 0; i < paddedBuffer.length; i++) { + paddedBuffer[i] = (byte) (0xAA + (i % 5)); + } + System.arraycopy(originalBytes, 0, paddedBuffer, offset, originalBytes.length); + return Binary.fromReusedByteArray(paddedBuffer, offset, originalBytes.length); + } + }; + + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + Binary bi = valuesInAscendingOrder[i]; + Binary bj = valuesInAscendingOrder[j]; + for (java.util.function.Function fi : perturb) { + for (java.util.function.Function fj : perturb) { + Binary perturbedBi = fi.apply(bi); + Binary perturbedBj = fj.apply(bj); + assertEquals(Integer.compare(i, j), BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(perturbedBi, perturbedBj)); + } + } + } + } + } + @Test public void testFloat16Comparator() { Binary[] valuesInAscendingOrder = { diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java index 018ce5b276..a78720e83a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java @@ -1334,7 +1334,7 @@ public void testTypeConstructionWithUndefinedColumnOrder() { @Test public void testTypeConstructionWithTypeDefinedColumnOrder() { PrimitiveTypeName[] types = - new PrimitiveTypeName[] {BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BINARY, FIXED_LEN_BYTE_ARRAY}; + new PrimitiveTypeName[] {BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, FIXED_LEN_BYTE_ARRAY}; for (PrimitiveTypeName type : types) { String name = type.toString() + "_"; int len = type == FIXED_LEN_BYTE_ARRAY ? 42 : 0; @@ -1350,8 +1350,6 @@ public void testTypeConstructionWithTypeDefinedColumnOrder() { @Test public void testTypeConstructionWithUnsupportedColumnOrder() { - assertThrows(null, IllegalArgumentException.class, (Callable) () -> - Types.optional(INT96).columnOrder(ColumnOrder.typeDefined()).named("int96_unsupported")); assertThrows(null, IllegalArgumentException.class, (Callable) () -> Types.optional(FIXED_LEN_BYTE_ARRAY) .length(12) diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java index 61fe3065e1..8e1e50f6eb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuildersWithLogicalTypes.java @@ -349,9 +349,6 @@ public void testIntervalAnnotationRejectsNonFixed12() { @Test public void testTypeConstructionWithUnsupportedColumnOrder() { - assertThrows(null, IllegalArgumentException.class, () -> Types.optional(INT96) - .columnOrder(ColumnOrder.typeDefined()) - .named("int96_unsupported")); assertThrows(null, IllegalArgumentException.class, () -> Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .length(12) .as(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance()) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index e277fcde5f..eff05f5a7c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -44,6 +44,7 @@ private HadoopReadOptions( boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -63,6 +64,7 @@ private HadoopReadOptions( useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, @@ -126,6 +128,7 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 895d0670fa..58dc1f4349 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -22,11 +22,13 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DEFAULT_READ_INT96_STATS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; @@ -63,6 +65,7 @@ public class ParquetReadOptions { private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = false; private final boolean useSignedStringMinMax; + private final boolean readInt96Stats; private final boolean useStatsFilter; private final boolean useDictionaryFilter; private final boolean useRecordFilter; @@ -91,6 +94,7 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -109,6 +113,7 @@ public class ParquetReadOptions { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, @@ -130,6 +135,7 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean readInt96Stats, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -148,6 +154,7 @@ public class ParquetReadOptions { this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; this.useHadoopVectoredIo = useHadoopVectoredIo; + this.readInt96Stats = readInt96Stats; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -163,6 +170,10 @@ public boolean useSignedStringMinMax() { return useSignedStringMinMax; } + public boolean readInt96Stats() { + return readInt96Stats; + } + public boolean useStatsFilter() { return useStatsFilter; } @@ -250,6 +261,7 @@ public static Builder builder(ParquetConfiguration conf) { public static class Builder { protected boolean useSignedStringMinMax = false; + protected boolean readInt96Stats = DEFAULT_READ_INT96_STATS_ENABLED; protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; @@ -287,6 +299,7 @@ public Builder(ParquetConfiguration conf) { withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); withUseHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, HADOOP_VECTORED_IO_DEFAULT)); + readInt96Stats(conf.getBoolean(READ_INT96_STATS_ENABLED, DEFAULT_READ_INT96_STATS_ENABLED)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); @@ -338,6 +351,11 @@ public Builder withUseHadoopVectoredIo(boolean useHadoopVectoredIo) { return this; } + public Builder readInt96Stats(boolean readInt96Stats) { + this.readInt96Stats = readInt96Stats; + return this; + } + public Builder useColumnIndexFilter(boolean useColumnIndexFilter) { this.useColumnIndexFilter = useColumnIndexFilter; return this; @@ -437,6 +455,7 @@ public Builder copy(ParquetReadOptions options) { useRecordFilter(options.useRecordFilter); withRecordFilter(options.recordFilter); withUseHadoopVectoredIo(options.useHadoopVectoredIo); + readInt96Stats(options.readInt96Stats); withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); @@ -469,6 +488,7 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + readInt96Stats, recordFilter, metadataFilter, codecFactory, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index d20ac7faeb..6cd1fd6072 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -24,6 +24,8 @@ import static org.apache.parquet.format.Util.readFileMetaData; import static org.apache.parquet.format.Util.writeColumnMetaData; import static org.apache.parquet.format.Util.writePageHeader; +import static org.apache.parquet.hadoop.ParquetInputFormat.DEFAULT_READ_INT96_STATS_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_INT96_STATS_ENABLED; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -47,6 +49,7 @@ import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.ValidInt96Stats; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -154,13 +157,14 @@ public class ParquetMetadataConverter { new ConvertedTypeConverterVisitor(); private final int statisticsTruncateLength; private final boolean useSignedStringMinMax; + private final boolean readInt96Stats; public ParquetMetadataConverter() { - this(false); + this(false, DEFAULT_READ_INT96_STATS_ENABLED); } public ParquetMetadataConverter(int statisticsTruncateLength) { - this(false, statisticsTruncateLength); + this(false, statisticsTruncateLength, DEFAULT_READ_INT96_STATS_ENABLED); } /** @@ -169,23 +173,27 @@ public ParquetMetadataConverter(int statisticsTruncateLength) { */ @Deprecated public ParquetMetadataConverter(Configuration conf) { - this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + this( + conf.getBoolean("parquet.strings.signed-min-max.enabled", false), + conf.getBoolean(READ_INT96_STATS_ENABLED, DEFAULT_READ_INT96_STATS_ENABLED)); } public ParquetMetadataConverter(ParquetReadOptions options) { - this(options.useSignedStringMinMax()); + this(options.useSignedStringMinMax(), options.readInt96Stats()); } - private ParquetMetadataConverter(boolean useSignedStringMinMax) { - this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + private ParquetMetadataConverter(boolean useSignedStringMinMax, boolean readInt96Stats) { + this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, readInt96Stats); } - private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) { + private ParquetMetadataConverter( + boolean useSignedStringMinMax, int statisticsTruncateLength, boolean readInt96Stats) { if (statisticsTruncateLength <= 0) { throw new IllegalArgumentException("Truncate length should be greater than 0"); } this.useSignedStringMinMax = useSignedStringMinMax; this.statisticsTruncateLength = statisticsTruncateLength; + this.readInt96Stats = readInt96Stats; } // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate @@ -600,8 +608,10 @@ private void addRowGroup( } if (columnMetaData.getStatistics() != null && !columnMetaData.getStatistics().isEmpty()) { - metaData.setStatistics( - toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength)); + metaData.setStatistics(toParquetStatistics( + parquetMetadata.getFileMetaData().getCreatedBy(), + columnMetaData.getStatistics(), + this.statisticsTruncateLength)); } if (columnMetaData.getEncodingStats() != null) { metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); @@ -762,12 +772,12 @@ public List convertEncodingStats(EncodingStats stats) { return formatStats; } - public static Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics stats) { - return toParquetStatistics(stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + public Statistics toParquetStatistics(String createdBy, org.apache.parquet.column.statistics.Statistics stats) { + return toParquetStatistics(createdBy, stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); } - public static Statistics toParquetStatistics( - org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { + public Statistics toParquetStatistics( + String createdBy, org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { Statistics formatStats = new Statistics(); // Don't write stats larger than the max size rather than truncating. The // rationale is that some engines may use the minimum value in the page as @@ -795,7 +805,7 @@ public static Statistics toParquetStatistics( formatStats.setMax(max); } - if (isMinMaxStatsSupported(stats.type()) || Arrays.equals(min, max)) { + if (isMinMaxStatsReadingSupported(createdBy, stats.type()) || Arrays.equals(min, max)) { formatStats.setMin_value(min); formatStats.setMax_value(max); } @@ -859,10 +869,17 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, .getBytes(); } - private static boolean isMinMaxStatsSupported(PrimitiveType type) { + private static boolean isMinMaxStatsWritingSupported(PrimitiveType type) { return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; } + private boolean isMinMaxStatsReadingSupported(String createdBy, PrimitiveType type) { + if (type.getPrimitiveTypeName() == PrimitiveTypeName.INT96) { + return readInt96Stats && ValidInt96Stats.hasValidInt96Stats(createdBy); + } + return isMinMaxStatsWritingSupported(type); + } + /** * @param statistics parquet format statistics * @param type a primitive type name @@ -870,7 +887,7 @@ private static boolean isMinMaxStatsSupported(PrimitiveType type) { * @deprecated will be removed in 2.0.0. */ @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( Statistics statistics, PrimitiveTypeName type) { return fromParquetStatistics(null, statistics, type); } @@ -883,7 +900,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist * @deprecated will be removed in 2.0.0. */ @Deprecated - public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( String createdBy, Statistics statistics, PrimitiveTypeName type) { return fromParquetStatisticsInternal( createdBy, @@ -893,7 +910,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist } // Visible for testing - static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( + org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) { // create stats object based on the column type org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = @@ -904,7 +921,7 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte if (formatStats.isSetMin_value() && formatStats.isSetMax_value()) { byte[] min = formatStats.min_value.array(); byte[] max = formatStats.max_value.array(); - if (isMinMaxStatsSupported(type) || Arrays.equals(min, max)) { + if (isMinMaxStatsReadingSupported(createdBy, type) || Arrays.equals(min, max)) { statsBuilder.withMin(min); statsBuilder.withMax(max); } @@ -1991,8 +2008,7 @@ private void buildChildren( // the types // where ordering is not supported. if (columnOrder.getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER - && (schemaElement.type == Type.INT96 - || schemaElement.converted_type == ConvertedType.INTERVAL)) { + && schemaElement.converted_type == ConvertedType.INTERVAL) { columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); } primitiveBuilder.columnOrder(columnOrder); @@ -2501,7 +2517,7 @@ private static org.apache.parquet.internal.column.columnindex.BoundaryOrder from public static ColumnIndex toParquetColumnIndex( PrimitiveType type, org.apache.parquet.internal.column.columnindex.ColumnIndex columnIndex) { - if (!isMinMaxStatsSupported(type) || columnIndex == null) { + if (!isMinMaxStatsWritingSupported(type) || columnIndex == null) { return null; } ColumnIndex parquetColumnIndex = new ColumnIndex( @@ -2521,9 +2537,9 @@ public static ColumnIndex toParquetColumnIndex( return parquetColumnIndex; } - public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex( - PrimitiveType type, ColumnIndex parquetColumnIndex) { - if (!isMinMaxStatsSupported(type)) { + public org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex( + String createdBy, PrimitiveType type, ColumnIndex parquetColumnIndex) { + if (!isMinMaxStatsReadingSupported(createdBy, type)) { return null; } return ColumnIndexBuilder.build( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index b12a819cdd..50cac92f8a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1705,8 +1705,10 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOExceptio -1); } } - return ParquetMetadataConverter.fromParquetColumnIndex( - column.getPrimitiveType(), Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); + return converter.fromParquetColumnIndex( + getFileMetaData().getCreatedBy(), + column.getPrimitiveType(), + Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD)); } /** diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 8e05d49bd3..ebc2bfe3e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -122,6 +122,16 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; + /** + * key to configure whether int96 stats are read and exposed + */ + public static final String READ_INT96_STATS_ENABLED = "parquet.read.int96stats.enabled"; + + /** + * default value for READ_INT96_STATS_ENABLED + */ + public static final boolean DEFAULT_READ_INT96_STATS_ENABLED = true; + /** * key to configure whether row group dictionary filtering is enabled */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 2529f06ada..52e4c92035 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -150,6 +150,7 @@ public class TestParquetMetadataConverter { private static final String CHAR_UPPER = CHAR_LOWER.toUpperCase(); private static final String NUMBER = "0123456789"; private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER; + private static final String CREATED_BY = "parquet-mr"; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -801,9 +802,9 @@ private void testBinaryStats(StatsHelper helper) { Assert.assertFalse("Min_value should not be set", formatStats.isSetMin_value()); Assert.assertFalse("Max_value should not be set", formatStats.isSetMax_value()); Assert.assertFalse("Num nulls should not be set", formatStats.isSetNull_count()); - - Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatisticsInternal( - Version.FULL_VERSION, + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + Statistics roundTripStats = converter.fromParquetStatisticsInternal( + CREATED_BY, formatStats, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, ""), ParquetMetadataConverter.SortOrder.SIGNED); @@ -842,7 +843,7 @@ private void testBinaryStatsWithTruncation(int truncateLen, int minLen, int maxL stats.updateStats(Binary.fromConstantByteArray(min)); stats.updateStats(Binary.fromConstantByteArray(max)); ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(truncateLen); - org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(stats); + org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(CREATED_BY, stats); if (minLen + maxLen >= ParquetMetadataConverter.MAX_STATS_SIZE) { assertNull(formatStats.getMin_value()); @@ -1051,7 +1052,7 @@ private void testStillUseStatsWithSignedSortOrderIfSingleValue(StatsHelper helpe PrimitiveType binaryType = Types.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("b"); Statistics convertedStats = converter.fromParquetStatistics( - Version.FULL_VERSION, ParquetMetadataConverter.toParquetStatistics(stats), binaryType); + Version.FULL_VERSION, converter.toParquetStatistics(CREATED_BY, stats), binaryType); Assert.assertFalse("Stats should not be empty: " + convertedStats, convertedStats.isEmpty()); Assert.assertArrayEquals( @@ -1126,7 +1127,7 @@ public void testMissingValuesFromStats() { PrimitiveType type = Types.required(PrimitiveTypeName.INT32).named("test_int32"); org.apache.parquet.format.Statistics formatStats = new org.apache.parquet.format.Statistics(); - Statistics stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + Statistics stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertFalse(stats.isNumNullsSet()); assertFalse(stats.hasNonNullValue()); assertTrue(stats.isEmpty()); @@ -1135,7 +1136,7 @@ public void testMissingValuesFromStats() { formatStats.clear(); formatStats.setMin(BytesUtils.intToBytes(-100)); formatStats.setMax(BytesUtils.intToBytes(100)); - stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertFalse(stats.isNumNullsSet()); assertTrue(stats.hasNonNullValue()); assertFalse(stats.isEmpty()); @@ -1145,7 +1146,7 @@ public void testMissingValuesFromStats() { formatStats.clear(); formatStats.setNull_count(2000); - stats = converter.fromParquetStatistics(Version.FULL_VERSION, formatStats, type); + stats = converter.fromParquetStatistics(CREATED_BY, formatStats, type); assertTrue(stats.isNumNullsSet()); assertFalse(stats.hasNonNullValue()); assertFalse(stats.isEmpty()); @@ -1169,7 +1170,8 @@ public void testSkippedV2Stats() { private void testSkippedV2Stats(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertFalse(statistics.isSetMin()); assertFalse(statistics.isSetMax()); assertFalse(statistics.isSetMin_value()); @@ -1207,7 +1209,8 @@ public void testV2OnlyStats() { private void testV2OnlyStats(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertFalse(statistics.isSetMin()); assertFalse(statistics.isSetMax()); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min_value); @@ -1249,7 +1252,8 @@ public void testV2StatsEqualMinMax() { private void testV2StatsEqualMinMax(PrimitiveType type, Object min, Object max) { Statistics stats = createStats(type, min, max); - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min); assertEquals(ByteBuffer.wrap(stats.getMaxBytes()), statistics.max); assertEquals(ByteBuffer.wrap(stats.getMinBytes()), statistics.min_value); @@ -1340,7 +1344,8 @@ private enum StatsHelper { V1() { @Override public org.apache.parquet.format.Statistics toParquetStatistics(Statistics stats) { - org.apache.parquet.format.Statistics statistics = ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + org.apache.parquet.format.Statistics statistics = converter.toParquetStatistics(CREATED_BY, stats); statistics.unsetMin_value(); statistics.unsetMax_value(); return statistics; @@ -1350,7 +1355,8 @@ public org.apache.parquet.format.Statistics toParquetStatistics(Statistics st V2() { @Override public org.apache.parquet.format.Statistics toParquetStatistics(Statistics stats) { - return ParquetMetadataConverter.toParquetStatistics(stats); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + return converter.toParquetStatistics(CREATED_BY, stats); } }; @@ -1447,9 +1453,11 @@ public void testColumnIndexConversion() { builder.add( stats, withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(5, 6), LongArrayList.of(2, 1)) : null); + ParquetMetadataConverter converter = new ParquetMetadataConverter(); org.apache.parquet.format.ColumnIndex parquetColumnIndex = - ParquetMetadataConverter.toParquetColumnIndex(type, builder.build()); - ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); + converter.toParquetColumnIndex(type, builder.build()); + + ColumnIndex columnIndex = converter.fromParquetColumnIndex(CREATED_BY, type, parquetColumnIndex); assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); @@ -1463,18 +1471,18 @@ public void testColumnIndexConversion() { ByteBuffer.allocate(0), ByteBuffer.wrap(BytesUtils.longToBytes(500l))) .equals(columnIndex.getMaxValues())); - assertNull( "Should handle null column index", - ParquetMetadataConverter.toParquetColumnIndex( + converter.toParquetColumnIndex( Types.required(PrimitiveTypeName.INT32).named("test_int32"), null)); assertNull( "Should ignore unsupported types", - ParquetMetadataConverter.toParquetColumnIndex( + converter.toParquetColumnIndex( Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); assertNull( "Should ignore unsupported types", - ParquetMetadataConverter.fromParquetColumnIndex( + converter.fromParquetColumnIndex( + CREATED_BY, Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .length(12) .as(OriginalType.INTERVAL) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 3126e1746f..965b6e017d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -82,7 +82,6 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; -import org.apache.parquet.format.Statistics; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -869,11 +868,11 @@ public void testConvertToThriftStatistics() throws Exception { parquetMRstats.updateStats(l); } final String createdBy = "parquet-mr version 1.8.0 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)"; - Statistics thriftStats = - org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats); + org.apache.parquet.format.converter.ParquetMetadataConverter converter = + new org.apache.parquet.format.converter.ParquetMetadataConverter(); + org.apache.parquet.format.Statistics thriftStats = converter.toParquetStatistics(createdBy, parquetMRstats); LongStatistics convertedBackStats = - (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics( - createdBy, thriftStats, PrimitiveTypeName.INT64); + (LongStatistics) converter.fromParquetStatistics(createdBy, thriftStats, PrimitiveTypeName.INT64); assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax()); assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index f2cf26d868..20889f9664 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -20,13 +20,14 @@ package org.apache.parquet.statistics; import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.function.Supplier; -import org.apache.parquet.FixedBinaryTestUtils; import org.apache.parquet.io.api.Binary; public class RandomValues { @@ -213,11 +214,11 @@ public Long nextValue() { } } - public static class Int96Generator extends RandomBinaryBase { - private final RandomRange randomRange = new RandomRange(randomInt96(), randomInt96()); - private final BigInteger minimum = randomRange.minimum(); - private final BigInteger maximum = randomRange.maximum(); - private final BigInteger range = maximum.subtract(minimum); + public static class Int96Generator extends RandomBinaryBase { + private final RandomRange randomRangeJulianDay = new RandomRange<>(randomInt(), randomInt()); + private final int minimumJulianDay = randomRangeJulianDay.minimum(); + private final int maximumJulianDay = randomRangeJulianDay.maximum(); + private final int rangeJulianDay = (maximumJulianDay - minimumJulianDay); private static final int INT_96_LENGTH = 12; @@ -226,13 +227,21 @@ public Int96Generator(long seed) { } @Override - public BigInteger nextValue() { - return (minimum.add(randomInt96(range))); + public Binary nextValue() { + long timeOfDay = randomLong(); + int julianDay = minimumJulianDay + randomPositiveInt(rangeJulianDay); + + ByteBuffer.wrap(buffer) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(timeOfDay) + .putInt(julianDay); + + return Binary.fromReusedByteArray(buffer, 0, INT_96_LENGTH); } @Override public Binary nextBinaryValue() { - return FixedBinaryTestUtils.getFixedBinary(INT_96_LENGTH, nextValue()); + return nextValue(); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java new file mode 100644 index 0000000000..13284da5d7 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatisticsRoundTrip.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestInt96TimestampStatisticsRoundTrip { + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private MessageType createSchema() { + return Types.buildMessage().required(INT96).named("timestamp_field").named("root"); + } + + /** + * Convert a timestamp string in format "yyyy-MM-ddTHH:mm:ss.SSS" to INT96 bytes using NanoTime. + * INT96 timestamps in Parquet are encoded as 12 bytes where: + * - First 8 bytes: nanoseconds from midnight + * - Last 4 bytes: Julian day + */ + private Binary timestampToInt96(String timestamp) { + LocalDateTime dt = LocalDateTime.parse(timestamp); + long julianDay = dt.toLocalDate().toEpochDay() + 2440588; // Convert to Julian Day + long nanos = dt.toLocalTime().toNanoOfDay(); + return new NanoTime((int) julianDay, nanos).toBinary(); + } + + private void writeParquetFile(Path file, List timestampValues) throws IOException { + MessageType schema = createSchema(); + Configuration conf = new Configuration(); + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .withConf(conf) + .withType(schema) + .build()) { + for (Binary value : timestampValues) { + Group group = new SimpleGroup(schema); + group.add("timestamp_field", value); + writer.write(group); + } + } + } + + private void verifyStatistics(Path file, Binary minValue, Binary maxValue, boolean readInt96Stats) + throws IOException { + Configuration conf = new Configuration(); + conf.set("parquet.read.int96stats.enabled", readInt96Stats ? "true" : "false"); + ParquetMetadata metadata = ParquetFileReader.readFooter(conf, file); + + // Verify INT96 statistics + ColumnChunkMetaData timestampColumn = + metadata.getBlocks().get(0).getColumns().get(0); + Statistics timestampStats = timestampColumn.getStatistics(); + + if (readInt96Stats) { + assertTrue("INT96 statistics have non-null values", timestampStats.hasNonNullValue()); + assertEquals(Binary.fromConstantByteArray(timestampStats.getMinBytes()), minValue); + assertEquals(Binary.fromConstantByteArray(timestampStats.getMaxBytes()), maxValue); + } else { + assertTrue("INT96 statistics should not be present", !timestampStats.hasNonNullValue()); + return; + } + } + + private void runTimestampTest(String[] timestamps) throws IOException { + Binary minValue = timestampToInt96(timestamps[0]); + Binary maxValue = timestampToInt96(timestamps[timestamps.length - 1]); + List timestampValues = new ArrayList<>(); + for (String timestamp : timestamps) { + timestampValues.add(timestampToInt96(timestamp)); + } + Collections.shuffle(timestampValues); + + Path file = new Path(temp.getRoot().getPath(), "test_timestamps.parquet"); + writeParquetFile(file, timestampValues); + verifyStatistics(file, minValue, maxValue, false); + verifyStatistics(file, minValue, maxValue, true); + } + + @Test + public void testMultipleDates() throws IOException { + String[] timestamps = { + "2020-01-01T00:00:00.000", + "2020-02-29T23:59:59.000", + "2020-12-31T23:59:59.000", + "2021-01-01T00:00:00.000", + "2023-06-15T12:30:45.000", + "2024-02-29T15:45:30.000", + "2024-12-25T07:00:00.000", + "2025-01-01T00:00:00.000", + "2025-07-04T20:00:00.000", + "2025-12-31T23:59:59.000" + }; + runTimestampTest(timestamps); + } + + @Test + public void testSameDayDifferentTime() throws IOException { + String[] timestamps = {"2020-01-01T00:01:00.000", "2020-01-01T00:02:00.000", "2020-01-01T00:03:00.000"}; + runTimestampTest(timestamps); + } + + @Test + public void testIncreasingDayDecreasingTime() throws IOException { + String[] timestamps = {"2020-01-01T12:00:00.000", "2020-02-01T11:00:00.000", "2020-03-01T10:00:00.000"}; + runTimestampTest(timestamps); + } +}