diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 0cc05d6d75..bd5b7b281c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -129,6 +129,7 @@ public void close() throws IOException, InterruptedException { if (!closed) { try { if (aborted) { + parquetFileWriter.abort(); return; } flushRowGroupToStore(); @@ -140,6 +141,9 @@ public void close() throws IOException, InterruptedException { } finalMetadata.putAll(finalWriteContext.getExtraMetaData()); parquetFileWriter.end(finalMetadata); + } catch (Exception e) { + parquetFileWriter.abort(); + throw e; } finally { AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); closed = true; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f0a912f599..cd7f1bc2c3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -171,6 +171,7 @@ public static enum Mode { // set when end is called private ParquetMetadata footer = null; + private boolean aborted; private boolean closed; private final CRC32 crc; @@ -333,6 +334,34 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long ro ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + @FunctionalInterface + interface IOCallable { + T call() throws IOException; + } + + private T withAbortOnFailure(IOCallable action) throws IOException { + try { + return action.call(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + + @FunctionalInterface + interface IORunnable { + void run() throws IOException; + } + + private void withAbortOnFailure(IORunnable action) throws IOException { + try { + action.run(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + /** * @param file OutputFile to create or overwrite * @param schema the schema of the data @@ -563,13 +592,15 @@ private ParquetFileWriter( * @throws IOException if there is an error while writing */ public void start() throws IOException { - state = state.start(); - LOG.debug("{}: start", out.getPos()); - byte[] magic = MAGIC; - if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { - magic = EFMAGIC; - } - out.write(magic); + withAbortOnFailure(() -> { + state = state.start(); + LOG.debug("{}: start", out.getPos()); + byte[] magic = MAGIC; + if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { + magic = EFMAGIC; + } + out.write(magic); + }); } public InternalFileEncryptor getEncryptor() { @@ -583,19 +614,21 @@ public InternalFileEncryptor getEncryptor() { * @throws IOException if there is an error while writing */ public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - LOG.debug("{}: start block", out.getPos()); - // out.write(MAGIC); // TODO: add a magic delimiter + withAbortOnFailure(() -> { + state = state.startBlock(); + LOG.debug("{}: start block", out.getPos()); + // out.write(MAGIC); // TODO: add a magic delimiter - alignment.alignForRowGroup(out); + alignment.alignForRowGroup(out); - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; - currentColumnIndexes = new ArrayList<>(); - currentOffsetIndexes = new ArrayList<>(); + currentColumnIndexes = new ArrayList<>(); + currentOffsetIndexes = new ArrayList<>(); - currentBloomFilters = new HashMap<>(); + currentBloomFilters = new HashMap<>(); + }); } /** @@ -608,26 +641,29 @@ public void startBlock(long recordCount) throws IOException { */ public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - encodingStatsBuilder.clear(); - currentEncodings = new HashSet(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getPrimitiveType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = -1; - compressedLength = 0; - uncompressedLength = 0; - // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one - currentStatistics = null; - currentSizeStatistics = SizeStatistics.newBuilder( - descriptor.getPrimitiveType(), - descriptor.getMaxRepetitionLevel(), - descriptor.getMaxDefinitionLevel()) - .build(); - - columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); - offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + withAbortOnFailure(() -> { + state = state.startColumn(); + encodingStatsBuilder.clear(); + currentEncodings = new HashSet(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getPrimitiveType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = -1; + compressedLength = 0; + uncompressedLength = 0; + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed + // one + currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder( + descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), + descriptor.getMaxDefinitionLevel()) + .build(); + + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); + offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + }); } /** @@ -637,45 +673,51 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio * @throws IOException if there is an error while writing */ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - writeDictionaryPage(dictionaryPage, null, null); + withAbortOnFailure(() -> { + writeDictionaryPage(dictionaryPage, null, null); + }); } public void writeDictionaryPage( DictionaryPage dictionaryPage, BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { - state = state.write(); - LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(dictionaryPage.getBytes()); - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - (int) crc.getValue(), - out, - headerBlockEncryptor, - AAD); - } else { - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out, - headerBlockEncryptor, - AAD); - } - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted - encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); - currentEncodings.add(dictionaryPage.getEncoding()); + withAbortOnFailure(() -> { + state = state.write(); + LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(dictionaryPage.getBytes()); + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + (int) crc.getValue(), + out, + headerBlockEncryptor, + AAD); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out, + headerBlockEncryptor, + AAD); + } + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); + dictionaryPage + .getBytes() + .writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); + currentEncodings.add(dictionaryPage.getEncoding()); + }); } /** @@ -867,22 +909,24 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - long beforeHeader = out.getPos(); - innerWriteDataPage( - valueCount, - uncompressedPageSize, - bytes, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - metadataBlockEncryptor, - pageHeaderAAD, - sizeStatistics); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + withAbortOnFailure(() -> { + long beforeHeader = out.getPos(); + innerWriteDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void innerWriteDataPage( @@ -974,51 +1018,53 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = toIntWithCheck(bytes.size(), "page"); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(bytes); - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); - bytes.writeAllTo(out); + withAbortOnFailure(() -> { + state = state.write(); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(bytes); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out, + metadataBlockEncryptor, + pageHeaderAAD); + } + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - encodingStatsBuilder.addDataEncoding(valuesEncoding); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + }); } /** @@ -1146,75 +1192,78 @@ public void writeDataPageV2( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); - int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); + withAbortOnFailure(() -> { + state = state.write(); + int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); + int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = - toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = + toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); - int uncompressedSize = - toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); + int uncompressedSize = + toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - - if (pageWriteChecksumEnabled) { - crc.reset(); - if (repetitionLevels.size() > 0) { - crcUpdate(repetitionLevels); - } - if (definitionLevels.size() > 0) { - crcUpdate(definitionLevels); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; } - if (compressedData.size() > 0) { - crcUpdate(compressedData); + + if (pageWriteChecksumEnabled) { + crc.reset(); + if (repetitionLevels.size() > 0) { + crcUpdate(repetitionLevels); + } + if (definitionLevels.size() > 0) { + crcUpdate(definitionLevels); + } + if (compressedData.size() > 0) { + crcUpdate(compressedData); + } + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + out, + metadataBlockEncryptor, + pageHeaderAAD); } - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headersSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedSize + headersSize; - this.compressedLength += compressedSize + headersSize; + long headersSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedSize + headersSize; + this.compressedLength += compressedSize + headersSize; - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - currentEncodings.add(dataEncoding); - encodingStatsBuilder.addDataEncoding(dataEncoding); + currentEncodings.add(dataEncoding); + encodingStatsBuilder.addDataEncoding(dataEncoding); - BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); + BytesInput.concat(repetitionLevels, definitionLevels, compressedData) + .writeAllTo(out); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void crcUpdate(BytesInput bytes) { @@ -1302,57 +1351,60 @@ void writeColumnChunk( int columnOrdinal, byte[] fileAAD) throws IOException { - startColumn(descriptor, valueCount, compressionCodecName); - - state = state.write(); - if (dictionaryPage != null) { - byte[] dictonaryPageHeaderAAD = null; - if (null != headerBlockEncryptor) { - dictonaryPageHeaderAAD = AesCipher.createModuleAAD( - fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + withAbortOnFailure(() -> { + startColumn(descriptor, valueCount, compressionCodecName); + + state = state.write(); + if (dictionaryPage != null) { + byte[] dictonaryPageHeaderAAD = null; + if (null != headerBlockEncryptor) { + dictonaryPageHeaderAAD = AesCipher.createModuleAAD( + fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } + writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); } - writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); - } - if (bloomFilter != null) { - // write bloom filter if one of data pages is not dictionary encoded - boolean isWriteBloomFilter = false; - for (Encoding encoding : dataEncodings) { - // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 - if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { - isWriteBloomFilter = true; - break; + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in + // parquet v2 + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; + } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } else { + LOG.info( + "No need to write bloom filter because column {} data pages are all encoded as dictionary.", + descriptor.getPath()); } } - if (isWriteBloomFilter) { - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - } else { - LOG.info( - "No need to write bloom filter because column {} data pages are all encoded as dictionary.", - descriptor.getPath()); + LOG.debug("{}: write data pages", out.getPos()); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + LOG.debug("{}: write data pages content", out.getPos()); + currentChunkFirstDataPage = out.getPos(); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); } - } - LOG.debug("{}: write data pages", out.getPos()); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - LOG.debug("{}: write data pages content", out.getPos()); - currentChunkFirstDataPage = out.getPos(); - bytes.writeAllTo(out); - encodingStatsBuilder.addDataEncodings(dataEncodings); - if (rlEncodings.isEmpty()) { - encodingStatsBuilder.withV2Pages(); - } - currentEncodings.addAll(rlEncodings); - currentEncodings.addAll(dlEncodings); - currentEncodings.addAll(dataEncodings); - currentStatistics = totalStats; - currentSizeStatistics = totalSizeStats; + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); + currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; - this.columnIndexBuilder = columnIndexBuilder; - this.offsetIndexBuilder = offsetIndexBuilder; + this.columnIndexBuilder = columnIndexBuilder; + this.offsetIndexBuilder = offsetIndexBuilder; - endColumn(); + endColumn(); + }); } /** @@ -1374,33 +1426,35 @@ public void invalidateStatistics(Statistics totalStatistics) { * @throws IOException if there is an error while writing */ public void endColumn() throws IOException { - state = state.endColumn(); - LOG.debug("{}: end column", out.getPos()); - if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { - currentColumnIndexes.add(null); - } else { - currentColumnIndexes.add(columnIndexBuilder.build()); - } - currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - encodingStatsBuilder.build(), - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength, - currentSizeStatistics)); - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - this.currentChunkDictionaryPageOffset = 0; - columnIndexBuilder = null; - offsetIndexBuilder = null; + withAbortOnFailure(() -> { + state = state.endColumn(); + LOG.debug("{}: end column", out.getPos()); + if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { + currentColumnIndexes.add(null); + } else { + currentColumnIndexes.add(columnIndexBuilder.build()); + } + currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + encodingStatsBuilder.build(), + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength, + currentSizeStatistics)); + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; + columnIndexBuilder = null; + offsetIndexBuilder = null; + }); } /** @@ -1409,22 +1463,24 @@ public void endColumn() throws IOException { * @throws IOException if there is an error while writing */ public void endBlock() throws IOException { - if (currentRecordCount == 0) { - throw new ParquetEncodingException("End block with zero record"); - } + withAbortOnFailure(() -> { + if (currentRecordCount == 0) { + throw new ParquetEncodingException("End block with zero record"); + } - state = state.endBlock(); - LOG.debug("{}: end block", out.getPos()); - currentBlock.setRowCount(currentRecordCount); - currentBlock.setOrdinal(blocks.size()); - blocks.add(currentBlock); - columnIndexes.add(currentColumnIndexes); - offsetIndexes.add(currentOffsetIndexes); - bloomFilters.add(currentBloomFilters); - currentColumnIndexes = null; - currentOffsetIndexes = null; - currentBloomFilters = null; - currentBlock = null; + state = state.endBlock(); + LOG.debug("{}: end block", out.getPos()); + currentBlock.setRowCount(currentRecordCount); + currentBlock.setOrdinal(blocks.size()); + blocks.add(currentBlock); + columnIndexes.add(currentColumnIndexes); + offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); + currentColumnIndexes = null; + currentOffsetIndexes = null; + currentBloomFilters = null; + currentBlock = null; + }); } /** @@ -1441,9 +1497,11 @@ public void appendFile(Configuration conf, Path file) throws IOException { } public void appendFile(InputFile file) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(file)) { - reader.appendTo(this); - } + withAbortOnFailure(() -> { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); + } + }); } /** @@ -1462,9 +1520,11 @@ public void appendRowGroups(FSDataInputStream file, List rowGroup public void appendRowGroups(SeekableInputStream file, List rowGroups, boolean dropColumns) throws IOException { - for (BlockMetaData block : rowGroups) { - appendRowGroup(file, block, dropColumns); - } + withAbortOnFailure(() -> { + for (BlockMetaData block : rowGroups) { + appendRowGroup(file, block, dropColumns); + } + }); } /** @@ -1482,83 +1542,86 @@ public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boole public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { - startBlock(rowGroup.getRowCount()); - - Map columnsToCopy = new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); - } + withAbortOnFailure(() -> { + startBlock(rowGroup.getRowCount()); - List columnsInOrder = new ArrayList(); - - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException( - String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + Map columnsToCopy = new HashMap(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); } - } - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } - - // copy the data for all chunks - long start = -1; - long length = 0; - long blockUncompressedSize = 0L; - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); - - // get this chunk's start position in the new file - long newChunkStart = out.getPos() + length; + List columnsInOrder = new ArrayList(); - // add this chunk to be copied with any previous chunks - if (start < 0) { - // no previous chunk included, start at this chunk's starting pos - start = chunk.getStartingPos(); + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException( + String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } } - length += chunk.getTotalSize(); - - if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { - // not contiguous. do the copy now. - copy(from, out, start, length); - // reset to start at the next column chunk - start = -1; - length = 0; + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); } - // TODO: column/offset indexes are not copied - // (it would require seeking to the end of the file for each row groups) - currentColumnIndexes.add(null); - currentOffsetIndexes.add(null); + // copy the data for all chunks + long start = -1; + long length = 0; + long blockUncompressedSize = 0L; + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + // get this chunk's start position in the new file + long newChunkStart = out.getPos() + length; - blockUncompressedSize += chunk.getTotalUncompressedSize(); - } + // add this chunk to be copied with any previous chunks + if (start < 0) { + // no previous chunk included, start at this chunk's starting pos + start = chunk.getStartingPos(); + } + length += chunk.getTotalSize(); + + if ((i + 1) == columnsInOrder.size() + || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + // not contiguous. do the copy now. + copy(from, out, start, length); + // reset to start at the next column chunk + start = -1; + length = 0; + } - currentBlock.setTotalByteSize(blockUncompressedSize); + // TODO: column/offset indexes are not copied + // (it would require seeking to the end of the file for each row groups) + currentColumnIndexes.add(null); + currentOffsetIndexes.add(null); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + blockUncompressedSize += chunk.getTotalUncompressedSize(); + } - endBlock(); + currentBlock.setTotalByteSize(blockUncompressedSize); + + endBlock(); + }); } /** @@ -1578,36 +1641,41 @@ public void appendColumnChunk( ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException { - long start = chunk.getStartingPos(); - long length = chunk.getTotalSize(); - long newChunkStart = out.getPos(); + withAbortOnFailure(() -> { + long start = chunk.getStartingPos(); + long length = chunk.getTotalSize(); + long newChunkStart = out.getPos(); - if (offsetIndex != null && newChunkStart != start) { - offsetIndex = - OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart - start); - } + OffsetIndex effectiveOffsetIndex = offsetIndex; - copy(from, out, start, length); + if (effectiveOffsetIndex != null && newChunkStart != start) { + effectiveOffsetIndex = OffsetIndexBuilder.getBuilder() + .fromOffsetIndex(effectiveOffsetIndex) + .build(newChunkStart - start); + } - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - currentColumnIndexes.add(columnIndex); - currentOffsetIndexes.add(offsetIndex); + copy(from, out, start, length); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + currentColumnIndexes.add(columnIndex); + currentOffsetIndexes.add(effectiveOffsetIndex); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); - currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + }); } // Buffers for the copy function. @@ -1647,17 +1715,25 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long * @throws IOException if there is an error while writing */ public void end(Map extraMetaData) throws IOException { - try { - state = state.end(); - serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); - serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); - serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); - LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out, fileEncryptor, metadataConverter); - } finally { - close(); - } + withAbortOnFailure(() -> { + try { + state = state.end(); + serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); + LOG.debug("{}: end", out.getPos()); + this.footer = + new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out, fileEncryptor, metadataConverter); + } finally { + close(); + } + }); + } + + /* Mark the writer as aborted to avoid flushing incomplete data. */ + public void abort() { + aborted = true; } @Override @@ -1665,8 +1741,13 @@ public void close() throws IOException { if (closed) { return; } - try (PositionOutputStream temp = out) { - temp.flush(); + + try { + if (!aborted) { + try (PositionOutputStream temp = out) { + temp.flush(); + } + } if (crcAllocator != null) { crcAllocator.close(); } @@ -2117,11 +2198,11 @@ static ParquetMetadata mergeFooters( * @throws IOException if there is an error while getting the current stream's position */ public long getPos() throws IOException { - return out.getPos(); + return withAbortOnFailure(() -> out.getPos()); } public long getNextRowGroupSize() throws IOException { - return alignment.nextRowGroupSize(out); + return withAbortOnFailure(() -> alignment.nextRowGroupSize(out)); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 64001bcaf2..09384758d5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -42,6 +42,7 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -50,6 +51,7 @@ import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -592,4 +594,42 @@ public void testSizeStatisticsControl() throws Exception { } } } + + @Test + public void testNoFlushAfterException() throws Exception { + final File testDir = temp.newFile(); + testDir.delete(); + + final Path file = new Path(testDir.getAbsolutePath(), "test.parquet"); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .named("binary_field") + .required(INT32) + .named("int32_field") + .named("test_schema_abort"); + Configuration conf = new Configuration(); + + try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withType(schema) + .build()) { + + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("binary_field", "hello").append("int32_field", 123)); + + Field internalWriterField = ParquetWriter.class.getDeclaredField("writer"); + internalWriterField.setAccessible(true); + Object internalWriter = internalWriterField.get(writer); + + Field abortedField = internalWriter.getClass().getDeclaredField("aborted"); + abortedField.setAccessible(true); + abortedField.setBoolean(internalWriter, true); + writer.close(); + } + + // After closing, check whether file exists or is empty + FileSystem fs = file.getFileSystem(conf); + assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); + } }