diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java index d46073551d..efd557f2fa 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It @@ -37,7 +39,9 @@ public final class TrackingByteBufferAllocator implements ByteBufferAllocator, A * * @see ByteBufferAllocationStacktraceException */ - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; + + private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferAllocator.class); public static TrackingByteBufferAllocator wrap(ByteBufferAllocator allocator) { return new TrackingByteBufferAllocator(allocator); @@ -69,6 +73,11 @@ public boolean equals(Object o) { public int hashCode() { return hashCode; } + + @Override + public String toString() { + return buffer.toString(); + } } public static class LeakDetectorHeapByteBufferAllocatorException extends RuntimeException { @@ -133,14 +142,22 @@ private TrackingByteBufferAllocator(ByteBufferAllocator allocator) { @Override public ByteBuffer allocate(int size) { ByteBuffer buffer = allocator.allocate(size); - allocated.put(new Key(buffer), ByteBufferAllocationStacktraceException.create()); + final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create(); + final Key key = new Key(buffer); + allocated.put(key, ex); + LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer); + if (DEBUG) { + LOG.debug("Stack", ex); + } return buffer; } @Override public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { Objects.requireNonNull(b); - if (allocated.remove(new Key(b)) == null) { + final Key key = new Key(b); + LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b); + if (allocated.remove(key) == null) { throw new ReleasingUnallocatedByteBufferException(); } allocator.release(b); @@ -156,6 +173,7 @@ public boolean isDirect() { @Override public void close() throws LeakedByteBufferException { if (!allocated.isEmpty()) { + allocated.keySet().forEach(key -> LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key)); LeakedByteBufferException ex = new LeakedByteBufferException( allocated.size(), allocated.values().iterator().next()); allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index b12a819cdd..43bc618902 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1308,10 +1308,15 @@ private void readVectored(List allParts, ChunkListBuilder b LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); // Request a vectored read; f.readVectored(ranges, options.getAllocator()); + List buffers = new ArrayList<>(allParts.size()); int k = 0; - for (ConsecutivePartList consecutivePart : allParts) { - ParquetFileRange currRange = ranges.get(k++); - consecutivePart.readFromVectoredRange(currRange, builder); + try { + for (ConsecutivePartList consecutivePart : allParts) { + ParquetFileRange currRange = ranges.get(k++); + buffers.add(consecutivePart.readFromVectoredRange(currRange, builder)); + } + } finally { + builder.addBuffersToRelease(buffers); } } @@ -2241,11 +2246,16 @@ private void setReadMetrics(long startNs, long len) { /** * Populate data in a parquet file range from a vectored range; will block for up * to {@link #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds. + * * @param currRange range to populated. * @param builder used to build chunk list to read the pages for the different columns. + * + * @return the buffer, for queuing for release later. + * * @throws IOException if there is an error while reading from the stream, including a timeout. */ - public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) throws IOException { + public ByteBuffer readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) + throws IOException { ByteBuffer buffer; final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS; long readStart = System.nanoTime(); @@ -2268,6 +2278,7 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b for (ChunkDescriptor descriptor : chunks) { builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); } + return buffer; } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java index 76b8be1f7e..573c42737e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java @@ -28,6 +28,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; @@ -529,6 +531,14 @@ private void writeEncryptedParquetFile( } catch (Exception e) { addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e, encryptionConfiguration, null); } + // remove the CRC file so that Hadoop local filesystem doesn't slice buffers on + // vector reads. + try { + final LocalFileSystem local = FileSystem.getLocal(new Configuration()); + local.delete(local.getChecksumFile(file), false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } private Path getFileName(Path root, EncryptionConfiguration encryptionConfiguration, int threadNumber) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 1a1a31e73c..3a7ce40f1d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -130,6 +130,8 @@ public static List makeUsers() { public static void setup() throws IOException { users = makeUsers(); phonebookFile = PhoneBookWriter.writeToFile(users); + // remove the CRC file + new File(phonebookFile.getParentFile(), "." + phonebookFile.getName() + ".crc").delete(); } private static interface UserFilter { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index 154dd6f5c5..352b71db3c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -64,6 +64,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -363,6 +365,10 @@ private static void writePhoneBookToFile( .withWriterVersion(parquetVersion), DATA); } + // remove the CRC file so that Hadoop local filesystem doesn't slice buffers on + // vector reads. + final LocalFileSystem local = FileSystem.getLocal(new Configuration()); + local.delete(local.getChecksumFile(file), false); } private static FileEncryptionProperties getFileEncryptionProperties() { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index db14f69150..680389fe92 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -21,6 +21,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.in; import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -34,6 +35,8 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -50,6 +53,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class TestParquetReader { @@ -59,8 +64,10 @@ public class TestParquetReader { private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet"); private static final List DATA = Collections.unmodifiableList(makeUsers(1000)); + private static final Logger LOG = LoggerFactory.getLogger(TestParquetReader.class); private final Path file; + private final boolean vectoredRead; private final long fileSize; private TrackingByteBufferAllocator allocator; @@ -72,15 +79,19 @@ private static Path createPathFromCP(String path) { } } - public TestParquetReader(Path file) throws IOException { + public TestParquetReader(Path file, final boolean vectoredRead) throws IOException { this.file = file; + this.vectoredRead = vectoredRead; this.fileSize = file.getFileSystem(new Configuration()).getFileStatus(file).getLen(); } - @Parameterized.Parameters + @Parameterized.Parameters(name = "file={0} vector={1}") public static Collection data() { - Object[][] data = new Object[][] {{FILE_V1}, {FILE_V2}, {STATIC_FILE_WITHOUT_COL_INDEXES}}; + Object[][] data = new Object[][] { + {FILE_V1, false}, {FILE_V2, false}, {STATIC_FILE_WITHOUT_COL_INDEXES, false}, + {FILE_V1, true}, {FILE_V2, true}, {STATIC_FILE_WITHOUT_COL_INDEXES, true} + }; return Arrays.asList(data); } @@ -96,6 +107,11 @@ public static void deleteFiles() throws IOException { deleteFile(FILE_V2); } + @Before + public void setup() throws IOException { + LOG.info("Test run with file {}, size {}; vectored={}", file, fileSize, vectoredRead); + } + private static void deleteFile(Path file) throws IOException { file.getFileSystem(new Configuration()).delete(file, false); } @@ -139,6 +155,10 @@ private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVers .withPageSize(pageSize) .withWriterVersion(parquetVersion), DATA); + // remove the CRC file so that Hadoop local filesystem doesn't slice buffers on + // vector reads. + final LocalFileSystem local = FileSystem.getLocal(new Configuration()); + local.delete(local.getChecksumFile(file), false); } private List readUsers( @@ -153,8 +173,11 @@ private List readUsers( long rangeStart, long rangeEnd) throws IOException { + final Configuration conf = new Configuration(); + conf.setBoolean(HADOOP_VECTORED_IO_ENABLED, vectoredRead); return PhoneBookWriter.readUsers( ParquetReader.builder(new GroupReadSupport(), file) + .withConf(conf) .withAllocator(allocator) .withFilter(filter) .useDictionaryFilter(useOtherFiltering) @@ -179,22 +202,22 @@ public void closeAllocator() { public void testCurrentRowIndex() throws Exception { ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator); // Fetch row index without processing any row. - assertEquals(reader.getCurrentRowIndex(), -1); + assertEquals(-1, reader.getCurrentRowIndex()); reader.read(); - assertEquals(reader.getCurrentRowIndex(), 0); + assertEquals(0, reader.getCurrentRowIndex()); // calling the same API again and again should return same result. - assertEquals(reader.getCurrentRowIndex(), 0); + assertEquals(0, reader.getCurrentRowIndex()); reader.read(); - assertEquals(reader.getCurrentRowIndex(), 1); - assertEquals(reader.getCurrentRowIndex(), 1); + assertEquals(1, reader.getCurrentRowIndex()); + assertEquals(1, reader.getCurrentRowIndex()); long expectedCurrentRowIndex = 2L; while (reader.read() != null) { assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); expectedCurrentRowIndex++; } // reader.read() returned null and so reader doesn't have any more rows. - assertEquals(reader.getCurrentRowIndex(), -1); + assertEquals(-1, reader.getCurrentRowIndex()); } @Test @@ -214,13 +237,13 @@ public void testSimpleFiltering() throws Exception { // The readUsers also validates the rowIndex for each returned row. List filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true); - assertEquals(filteredUsers1.size(), 2L); + assertEquals(2L, filteredUsers1.size()); List filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false); - assertEquals(filteredUsers2.size(), 2L); + assertEquals(2L, filteredUsers2.size()); List filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false); - assertEquals(filteredUsers3.size(), 1000L); + assertEquals(1000L, filteredUsers3.size()); } @Test diff --git a/pom.xml b/pom.xml index ce0f1adf50..5411a73d53 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ 2.30.0 shaded.parquet - 3.3.0 + 3.4.1 2.11.0 1.15.1 thrift