diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..77014e92ce 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -68,6 +68,7 @@ public class ParquetProperties { public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; + public static final double DEFAULT_PAGE_COMPRESS_THRESHOLD = 0.98; /** * @deprecated This shared instance can cause thread safety issues when used by multiple builders concurrently. @@ -120,6 +121,7 @@ public static WriterVersion fromString(String name) { private final int statisticsTruncateLength; private final boolean statisticsEnabled; private final boolean sizeStatisticsEnabled; + private final double pageCompressThreshold; // The expected NDV (number of distinct values) for each columns private final ColumnProperty bloomFilterNDVs; @@ -154,6 +156,8 @@ private ParquetProperties(Builder builder) { this.statisticsTruncateLength = builder.statisticsTruncateLength; this.statisticsEnabled = builder.statisticsEnabled; this.sizeStatisticsEnabled = builder.sizeStatisticsEnabled; + this.pageCompressThreshold = builder.pageCompressThreshold; + this.bloomFilterNDVs = builder.bloomFilterNDVs.build(); this.bloomFilterFPPs = builder.bloomFilterFPPs.build(); this.bloomFilterEnabled = builder.bloomFilterEnabled.build(); @@ -322,6 +326,10 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + public double pageCompressThreshold() { + return pageCompressThreshold; + } + public OptionalLong getBloomFilterNDV(ColumnDescriptor column) { Long ndv = bloomFilterNDVs.getValue(column); return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv); @@ -388,7 +396,8 @@ public String toString() { + "Page row count limit to " + getPageRowCountLimit() + '\n' + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n' + "Statistics enabled: " + statisticsEnabled + '\n' - + "Size statistics enabled: " + sizeStatisticsEnabled; + + "Size statistics enabled: " + sizeStatisticsEnabled + '\n' + + "Page compress threshold: " + pageCompressThreshold; } public static class Builder { @@ -406,6 +415,7 @@ public static class Builder { private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH; private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED; private boolean sizeStatisticsEnabled = DEFAULT_SIZE_STATISTICS_ENABLED; + private double pageCompressThreshold = DEFAULT_PAGE_COMPRESS_THRESHOLD; private final ColumnProperty.Builder bloomFilterNDVs; private final ColumnProperty.Builder bloomFilterFPPs; private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; @@ -460,6 +470,7 @@ private Builder(ParquetProperties toCopy) { this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.pageCompressThreshold = toCopy.pageCompressThreshold(); } /** @@ -756,6 +767,21 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Sets the compression threshold for data pages, only effect for V2 pages. + * + *

When the compression ratio (compressed size / uncompressed size) exceeds this threshold, + * the uncompressed data will be used instead. For example, with a threshold of 0.98, if + * compression only saves 2% of space, the data will not be compressed. + * + * @param threshold the compression ratio threshold, default is {@value #DEFAULT_PAGE_COMPRESS_THRESHOLD} + * @return this builder for method chaining + */ + public Builder withPageCompressThreshold(double threshold) { + this.pageCompressThreshold = threshold; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index d9e6ea0990..5808307985 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -53,6 +53,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; @@ -67,12 +68,13 @@ public class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore { private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class); - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + private static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter { private final ColumnDescriptor path; private final BytesInputCompressor compressor; + private final double pageCompressThreshold; private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream(); private final ConcatenatingByteBufferCollector buf; @@ -84,20 +86,20 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil private int pageCount; // repetition and definition level encodings are used only for v1 pages and don't change - private Set rlEncodings = new HashSet(); - private Set dlEncodings = new HashSet(); - private List dataEncodings = new ArrayList(); + private final Set rlEncodings = new HashSet<>(); + private final Set dlEncodings = new HashSet<>(); + private final List dataEncodings = new ArrayList<>(); private BloomFilter bloomFilter; private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; - private Statistics totalStatistics; + private Statistics totalStatistics; private final SizeStatistics totalSizeStatistics; private final GeospatialStatistics totalGeospatialStatistics; private final ByteBufferReleaser releaser; private final CRC32 crc; - boolean pageWriteChecksumEnabled; + private final boolean pageWriteChecksumEnabled; private final BlockCipher.Encryptor headerBlockEncryptor; private final BlockCipher.Encryptor pageBlockEncryptor; @@ -118,9 +120,11 @@ private ColumnChunkPageWriter( BlockCipher.Encryptor pageBlockEncryptor, byte[] fileAAD, int rowGroupOrdinal, - int columnOrdinal) { + int columnOrdinal, + double pageCompressThreshold) { this.path = path; this.compressor = compressor; + this.pageCompressThreshold = pageCompressThreshold; this.releaser = new ByteBufferReleaser(allocator); this.buf = new ConcatenatingByteBufferCollector(allocator); this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); @@ -305,9 +309,14 @@ public void writePageV2( boolean compressed = false; BytesInput compressedData = BytesInput.empty(); if (data.size() > 0) { - // TODO: decide if we compress compressedData = compressor.compress(data); compressed = true; + double compressionRatio = (double) compressedData.size() / data.size(); + if (compressor.getCodecName() != CompressionCodecName.UNCOMPRESSED + && compressionRatio > pageCompressThreshold) { + compressedData = data; + compressed = false; + } } if (null != pageBlockEncryptor) { AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); @@ -529,10 +538,75 @@ public void writeBloomFilter(BloomFilter bloomFilter) { } } - private final Map writers = - new HashMap(); + private final Map writers = new HashMap<>(); private final MessageType schema; + private ColumnChunkPageWriteStore( + BytesInputCompressor compressor, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal, + double pageCompressThreshold) { + this.schema = schema; + + if (null == fileEncryptor) { + for (ColumnDescriptor path : schema.getColumns()) { + writers.put( + path, + new ColumnChunkPageWriter( + path, + compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + null, + null, + null, + -1, + -1, + pageCompressThreshold)); + } + } else { + // Encrypted file + int columnOrdinal = -1; + byte[] fileAAD = fileEncryptor.getFileAAD(); + for (ColumnDescriptor path : schema.getColumns()) { + columnOrdinal++; + BlockCipher.Encryptor headerBlockEncryptor = null; + BlockCipher.Encryptor pageBlockEncryptor = null; + ColumnPath columnPath = ColumnPath.get(path.getPath()); + + InternalColumnEncryptionSetup columnSetup = + fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); + if (columnSetup.isEncrypted()) { + headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); + pageBlockEncryptor = columnSetup.getDataEncryptor(); + } + + writers.put( + path, + new ColumnChunkPageWriter( + path, + compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + headerBlockEncryptor, + pageBlockEncryptor, + fileAAD, + rowGroupOrdinal, + columnOrdinal, + pageCompressThreshold)); + } + } + } + + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ @Deprecated public ColumnChunkPageWriteStore( BytesCompressor compressor, @@ -547,6 +621,10 @@ public ColumnChunkPageWriteStore( ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ + @Deprecated public ColumnChunkPageWriteStore( BytesInputCompressor compressor, MessageType schema, @@ -560,6 +638,9 @@ public ColumnChunkPageWriteStore( ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ @Deprecated public ColumnChunkPageWriteStore( BytesCompressor compressor, @@ -570,30 +651,22 @@ public ColumnChunkPageWriteStore( this((BytesInputCompressor) compressor, schema, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled); } + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ + @Deprecated public ColumnChunkPageWriteStore( BytesInputCompressor compressor, MessageType schema, ByteBufferAllocator allocator, int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { - this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - null, - null, - -1, - -1)); - } + this(compressor, schema, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, null, -1); } + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ @Deprecated public ColumnChunkPageWriteStore( BytesCompressor compressor, @@ -613,6 +686,10 @@ public ColumnChunkPageWriteStore( rowGroupOrdinal); } + /** + * @deprecated will be removed in 2.0.0. Use build method instead. + */ + @Deprecated public ColumnChunkPageWriteStore( BytesInputCompressor compressor, MessageType schema, @@ -621,55 +698,15 @@ public ColumnChunkPageWriteStore( boolean pageWriteChecksumEnabled, InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { - this.schema = schema; - if (null == fileEncryptor) { - for (ColumnDescriptor path : schema.getColumns()) { - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - null, - null, - -1, - -1)); - } - return; - } - - // Encrypted file - int columnOrdinal = -1; - byte[] fileAAD = fileEncryptor.getFileAAD(); - for (ColumnDescriptor path : schema.getColumns()) { - columnOrdinal++; - BlockCipher.Encryptor headerBlockEncryptor = null; - BlockCipher.Encryptor pageBlockEncryptor = null; - ColumnPath columnPath = ColumnPath.get(path.getPath()); - - InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); - if (columnSetup.isEncrypted()) { - headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); - pageBlockEncryptor = columnSetup.getDataEncryptor(); - } - - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - headerBlockEncryptor, - pageBlockEncryptor, - fileAAD, - rowGroupOrdinal, - columnOrdinal)); - } + this( + compressor, + schema, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal, + Double.MAX_VALUE); } @Override @@ -694,4 +731,63 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException { pageWriter.writeToFileWriter(writer); } } + + public static Builder build(BytesInputCompressor compressor, MessageType schema, ByteBufferAllocator allocator) { + return new Builder(compressor, schema, allocator); + } + + public static class Builder { + private final MessageType schema; + private final BytesInputCompressor compressor; + private final ByteBufferAllocator allocator; + + private int columnIndexTruncateLength = ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; + private boolean pageWriteChecksumEnabled = ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private InternalFileEncryptor fileEncryptor = null; + private int rowGroupOrdinal = -1; + private double pageCompressThreshold = Double.MAX_VALUE; + + public Builder(BytesInputCompressor compressor, MessageType schema, ByteBufferAllocator allocator) { + this.compressor = compressor; + this.schema = schema; + this.allocator = allocator; + } + + public Builder withColumnIndexTruncateLength(int newLength) { + this.columnIndexTruncateLength = newLength; + return this; + } + + public Builder withPageWriteChecksumEnabled(boolean newPageWriteChecksumEnabled) { + this.pageWriteChecksumEnabled = newPageWriteChecksumEnabled; + return this; + } + + public Builder withFileEncryptor(InternalFileEncryptor newFileEncryptor) { + this.fileEncryptor = newFileEncryptor; + return this; + } + + public Builder withRowGroupOrdinal(int newRowGroupOrdinal) { + this.rowGroupOrdinal = newRowGroupOrdinal; + return this; + } + + public Builder withPageCompressThreshold(double newPageCompressThreshold) { + this.pageCompressThreshold = newPageCompressThreshold; + return this; + } + + public ColumnChunkPageWriteStore build() { + return new ColumnChunkPageWriteStore( + compressor, + schema, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal, + pageCompressThreshold); + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 41b068d01a..c3e6a8544c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -108,14 +108,14 @@ public ParquetMetadata getFooter() { } private void initStore() { - ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - compressor, - schema, - props.getAllocator(), - props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), - fileEncryptor, - rowGroupOrdinal); + ColumnChunkPageWriteStore columnChunkPageWriteStore = ColumnChunkPageWriteStore.build( + compressor, schema, props.getAllocator()) + .withColumnIndexTruncateLength(props.getColumnIndexTruncateLength()) + .withPageWriteChecksumEnabled(props.getPageWriteChecksumEnabled()) + .withFileEncryptor(fileEncryptor) + .withRowGroupOrdinal(rowGroupOrdinal) + .withPageCompressThreshold(props.pageCompressThreshold()) + .build(); pageStore = columnChunkPageWriteStore; bloomFilterWriteStore = columnChunkPageWriteStore; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..a453a6bfa8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -976,6 +976,21 @@ public SELF withSizeStatisticsEnabled(boolean enabled) { return self(); } + /** + * Sets the compression threshold for data pages, only effect for V2 pages. + * + *

When the compression ratio (compressed size / uncompressed size) exceeds this threshold, + * the uncompressed data will be used instead. For example, with a threshold of 0.98, if + * compression only saves 2% of space, the data will not be compressed. + * + * @param threshold the compression ratio threshold, default is {@link ParquetProperties#DEFAULT_PAGE_COMPRESS_THRESHOLD} + * @return this builder for method chaining + */ + public SELF withPageCompressThreshold(double threshold) { + encodingPropsBuilder.withPageCompressThreshold(threshold); + return self(); + } + /** * Build a {@link ParquetWriter} with the accumulated configuration. * diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index cd82cf4a8b..3ab1c9c713 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -755,6 +755,12 @@ private void processChunk( } readValues += headerV2.getNum_values(); readRows += headerV2.getNum_rows(); + boolean compressed; + if (compressor == null) { + compressed = headerV2.is_compressed; + } else { + compressed = true; + } writer.writeDataPageV2( headerV2.getNum_rows(), headerV2.getNum_nulls(), @@ -763,7 +769,7 @@ private void processChunk( dlLevels, converter.getEncoding(headerV2.getEncoding()), BytesInput.from(pageLoad), - headerV2.is_compressed, + compressed, rawDataLength, statistics, metaEncryptor, @@ -981,14 +987,14 @@ private void nullifyColumn( // Create new schema that only has the current column MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor)); - ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( - compressor, - newSchema, - props.getAllocator(), - props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), - nullColumnEncryptor, - numBlocksRewritten); + ColumnChunkPageWriteStore cPageStore = ColumnChunkPageWriteStore.build( + compressor, newSchema, props.getAllocator()) + .withColumnIndexTruncateLength(props.getColumnIndexTruncateLength()) + .withPageWriteChecksumEnabled(props.getPageWriteChecksumEnabled()) + .withFileEncryptor(nullColumnEncryptor) + .withRowGroupOrdinal(numBlocksRewritten) + .withPageCompressThreshold(props.pageCompressThreshold()) + .build(); ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore); ColumnWriter cWriter = cStore.getColumnWriter(descriptor); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index 2b037b5261..8ee0235858 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; @@ -41,10 +42,13 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.Files; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -62,15 +66,20 @@ import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; +import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.schema.MessageType; @@ -136,7 +145,9 @@ public void initConfiguration() { @After public void closeAllocator() { - allocator.close(); + if (allocator != null) { + allocator.close(); + } } @Test @@ -154,13 +165,7 @@ public void testWithDirectBuffers() throws Exception { } public void test(Configuration config, ByteBufferAllocator allocator) throws Exception { - Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet"); - Path root = file.getParent(); - FileSystem fs = file.getFileSystem(config); - if (fs.exists(root)) { - fs.delete(root, true); - } - fs.mkdirs(root); + Path file = createTestFile("test.parquet"); MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }"); ColumnDescriptor col = schema.getColumns().get(0); Encoding dataEncoding = PLAIN; @@ -194,8 +199,10 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc writer.start(); writer.startBlock(rowCount); pageOffset = outputFile.out().getPos(); - try (ColumnChunkPageWriteStore store = - new ColumnChunkPageWriteStore(compressor(GZIP), schema, allocator, Integer.MAX_VALUE)) { + ColumnChunkPageWriteStore.Builder builder = ColumnChunkPageWriteStore.build( + compressor(GZIP), schema, allocator) + .withColumnIndexTruncateLength(Integer.MAX_VALUE); + try (ColumnChunkPageWriteStore store = builder.build()) { PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, @@ -279,11 +286,12 @@ public void testColumnOrderV1() throws IOException { int fakeCount = 3; BinaryStatistics fakeStats = new BinaryStatistics(); - try (ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( - compressor(UNCOMPRESSED), - schema, - allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()), - Integer.MAX_VALUE)) { + ColumnChunkPageWriteStore.Builder builder = ColumnChunkPageWriteStore.build( + compressor(UNCOMPRESSED), + schema, + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) + .withColumnIndexTruncateLength(Integer.MAX_VALUE); + try (ColumnChunkPageWriteStore store = builder.build()) { for (ColumnDescriptor col : schema.getColumns()) { PageWriter pageWriter = store.getPageWriter(col); @@ -316,7 +324,173 @@ public void testColumnOrderV1() throws IOException { } } + @Test + public void testV2PageCompressThreshold() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 data; }"); + ColumnDescriptor col = schema.getColumns().get(0); + + int valueCount = 100; + int rowCount = 100; + int nullCount = 0; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] value = + ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(0).array(); + for (int i = 0; i < valueCount; i++) { + baos.write(value); + } + BytesInput data = BytesInput.from(baos.toByteArray()); + + BytesInput definitionLevels = BytesInput.empty(); + BytesInput repetitionLevels = BytesInput.empty(); + Statistics statistics = Statistics.getBuilderForReading( + Types.required(INT32).named("data")) + .build(); + statistics.setMinMaxFromBytes(value, value); + + Path file = writeSingleColumnData( + "lowCompressThreshold", + schema, + col, + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + data, + statistics, + 0.01); + ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); + ColumnChunkMetaData columnMeta = footer.getBlocks().get(0).getColumns().get(0); + + long uncompressedSize = columnMeta.getTotalUncompressedSize(); + long compressedSize = columnMeta.getTotalSize(); + + assertEquals( + "Data should be stored uncompressed when compression ratio exceeds threshold", + uncompressedSize, + compressedSize); + + verifyReadData(file, col, rowCount, nullCount, valueCount, data, false); + + file = writeSingleColumnData( + "highCompressThreshold", + schema, + col, + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + data, + statistics, + 1.0); + footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); + columnMeta = footer.getBlocks().get(0).getColumns().get(0); + + uncompressedSize = columnMeta.getTotalUncompressedSize(); + compressedSize = columnMeta.getTotalSize(); + + assertTrue( + "Data should be stored compressed when compression ratio does not exceed threshold", + uncompressedSize > compressedSize); + + verifyReadData(file, col, rowCount, nullCount, valueCount, data, true); + } + + private Path writeSingleColumnData( + String filePrefix, + MessageType schema, + ColumnDescriptor col, + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + BytesInput data, + Statistics statistics, + double pageCompressThreshold) + throws Exception { + Path file = createTestFile(filePrefix); + OutputFileForTesting outputFile = new OutputFileForTesting(file, conf); + ParquetProperties props = ParquetProperties.builder() + .withAllocator(TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) + .withPageCompressThreshold(pageCompressThreshold) + .build(); + + ParquetFileWriter writer = new ParquetFileWriter( + outputFile, + schema, + Mode.OVERWRITE, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + null, + props); + writer.start(); + writer.startBlock(rowCount); + + ColumnChunkPageWriteStore.Builder builder = ColumnChunkPageWriteStore.build( + compressor(GZIP), schema, props.getAllocator()) + .withColumnIndexTruncateLength(Integer.MAX_VALUE) + .withPageCompressThreshold(pageCompressThreshold); + + try (ColumnChunkPageWriteStore store = builder.build()) { + PageWriter pageWriter = store.getPageWriter(col); + pageWriter.writePageV2( + rowCount, nullCount, valueCount, repetitionLevels, definitionLevels, PLAIN, data, statistics); + store.flushToFileWriter(writer); + } + + writer.endBlock(); + writer.end(new HashMap<>()); + + return file; + } + private BytesInputCompressor compressor(CompressionCodecName codec) { return new CodecFactory(conf, pageSize).getCompressor(codec); } + + private Path createTestFile(String prefix) throws Exception { + java.nio.file.Path path = Files.createTempFile(prefix, ".tmp").toAbsolutePath(); + Files.deleteIfExists(path); + return new Path(path.toString()); + } + + private void verifyReadData( + Path file, + ColumnDescriptor col, + int expectedRowCount, + int expectedNullCount, + int expectedValueCount, + BytesInput expectedData, + boolean expectedCompressed) + throws IOException { + InputFile inputFile = HadoopInputFile.fromPath(file, conf); + ParquetReadOptions options = ParquetReadOptions.builder().build(); + try (ParquetFileReader reader = ParquetFileReader.open(inputFile, options)) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + reader.f.seek(blockMetaData.getStartingPos()); + + PageHeader pageHeader = Util.readPageHeader(reader.f); + assertEquals( + "Page compressed mismatch", + expectedCompressed, + pageHeader.getData_page_header_v2().isIs_compressed()); + } + + try (ParquetFileReader reader = new ParquetFileReader(inputFile, options)) { + PageReadStore rowGroup = reader.readNextRowGroup(); + PageReader pageReader = rowGroup.getPageReader(col); + DataPageV2 page = (DataPageV2) pageReader.readPage(); + + assertEquals("Row count mismatch", expectedRowCount, page.getRowCount()); + assertEquals("Null count mismatch", expectedNullCount, page.getNullCount()); + assertEquals("Value count mismatch", expectedValueCount, page.getValueCount()); + + BytesInput actualData = page.getData(); + byte[] expectedBytes = expectedData.toByteArray(); + byte[] actualBytes = actualData.toByteArray(); + assertArrayEquals("Data content mismatch", expectedBytes, actualBytes); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java index 013498c2b4..bd1a64794e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java @@ -147,12 +147,11 @@ private Path writeSimpleParquetFile( CodecFactory codecFactory = new CodecFactory(conf, PAGE_SIZE); BytesInputCompressor compressor = codecFactory.getCompressor(compression); - ColumnChunkPageWriteStore writeStore = new ColumnChunkPageWriteStore( - compressor, - schemaSimple, - new HeapByteBufferAllocator(), - Integer.MAX_VALUE, - ParquetOutputFormat.getPageWriteChecksumEnabled(conf)); + ColumnChunkPageWriteStore writeStore = ColumnChunkPageWriteStore.build( + compressor, schemaSimple, new HeapByteBufferAllocator()) + .withColumnIndexTruncateLength(Integer.MAX_VALUE) + .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)) + .build(); if (version == ParquetProperties.WriterVersion.PARQUET_1_0) { PageWriter pageWriter = writeStore.getPageWriter(colADesc); @@ -281,6 +280,7 @@ private Path writeNestedWithNullsSampleParquetFile( .withType(schemaNestedWithNulls) .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)) .withWriterVersion(version) + .withPageCompressThreshold(100) // always compress .build()) { GroupFactory groupFactory = new SimpleGroupFactory(schemaNestedWithNulls); Random rand = new Random(42);