-
Notifications
You must be signed in to change notification settings - Fork 478
Description
Another variation on #3617 that was fixed in 2.1.2.
While testing a new datawave feature using Accumulo 2.1.3 and Hadoop 3.3.5. In RFile$LocalityGroupReader _seek or _next, when calling currBlock = getDataBlock(indexEntry); on line 855 or 1044, if the thread is interrupted while in the HDFS code while executing DFSInputStream.blockSeekTo(DFSInputStream.java:645) causing an InterruptedIOException the variable DFSInputStream.blockReader is left as null which will cause a NullPointerException if the stream is used again.
I think what happened is that when the Datawave code was yielding in an Ivarator (separate thread), I cancelled the Future that the Ivarator Runnable was in with interrupt=true. While the InterruptedIOExecption did propagate through the RFile$LocalityGroupReader, it did not get propagated back to the LookupTask.
If propagating that Exception will be enough to not keep subsequent calls from using the RFile, then I think the Accumulo code is OK.. but I have not verified this. If the clean-up code in RFileLocalityGroupReader.reset should be able to deal with the exception happening in a separate thread, then there may be an issue.... which is probably the same issue that was previously worked.
I think that if the RFile gets re-used, then even though CacheableBlockFile.Reader.bcfr gets set to null when RFile.LocalityGroupReader.reset() calls CacheableBlockFile.Reader.close on an Exception, a subsequent call to CacheableBlockFile.Reader.getBCFile(byte[] serializedMetadata) will re-populate bcfr using the InputStream from the CacheableBlockFile.Reader.inputSupplier. I have not traced this yet, but I think that's how the code would work and is my best explanation for how I am hitting this same condition again even though the improved cleanup code is in-place where bcfr is set to null and the reader is closed.
The cleanup code in RFileLocalityGroupReader.reset that the seek or next catch block that calls
private void reset(boolean exceptionThrown) {
rk = null;
hasTop = false;
if (currBlock != null) {
try {
try {
currBlock.close();
if (exceptionThrown) {
reader.close();
}
} catch (IOException e) {
log.warn("Failed to close block reader", e);
}
} finally {
currBlock = null;
}
}
}
which calls reader.close
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
BCFile.Reader reader = bcfr.getAndSet(null);
if (reader != null) {
reader.close();
}
if (fin != null) {
// synchronize on the FSDataInputStream to ensure thread safety with the
// BoundedRangeFileInputStream
synchronized (fin) {
fin.close();
}
}
}
java.io.InterruptedIOException
2025-01-09T17:22:23,974 [DATAWAVE Ivarator (4fb08bee)-6658 -> DatawaveFieldIndexRegexIteratorJexl in c259f7d7-e4d8-4aa4-82c0-a646fc7e3c6f_168b80f_[20170716_0 fi%00;REVISION_TEXT_TOKEN:Q%85;?%ea; [] 9223372036854775807 false,20170716_0 fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 9223372036854775807 false)] [impl.BlockReaderFactory] WARN : I/O error constructing remote block reader.
java.io.InterruptedIOException: Interrupted while waiting for IO on channel java.nio.channels.SocketChannel[connection-pending remote=/10.117.191.71:9866]. Total timeout mills is 60000, 60000 millis timeout left.
at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:350) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:202) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:600) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3033) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:829) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:754) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:381) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:715) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:645) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:845) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:918) ~[hadoop-client-api-3.3.5.jar:?]
at java.base/java.io.DataInputStream.read(DataInputStream.java:149) ~[?:?]
at org.apache.accumulo.core.file.streams.RateLimitedInputStream.read(RateLimitedInputStream.java:52) ~[accumulo-core-2.1.3.jar:2.1.3]
at java.base/java.io.DataInputStream.read(DataInputStream.java:149) ~[?:?]
at org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream.read(BoundedRangeFileInputStream.java:98) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:179) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:163) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) ~[hadoop-client-api-3.3.5.jar:?]
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290) ~[?:?]
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) ~[?:?]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200) ~[?:?]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170) ~[?:?]
at org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader$BaseBlockLoader.load(CachableBlockFile.java:362) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache.getBlock(SynchronousLoadingBlockCache.java:126) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader.getDataBlock(CachableBlockFile.java:460) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.getDataBlock(RFile.java:893) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader._next(RFile.java:855) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.next(RFile.java:833) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:86) ~[accumulo-server-base-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.StatsIterator.next(StatsIterator.java:51) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator.next(DeletingIterator.java:65) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.ServerSkippingIterator.next(ServerSkippingIterator.java:45) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.SynchronizedServerFilter.next(SynchronizedServerFilter.java:51) ~[accumulo-core-2.1.3.jar:2.1.3]
at datawave.iterators.PropogatingIterator.next(PropogatingIterator.java:251) ~[?:?]
at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:100) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:60) ~[accumulo-core-2.1.3.jar:2.1.3]
at datawave.core.iterators.IvaratorRunnable.run(IvaratorRunnable.java:225) ~[?:?]
at datawave.core.iterators.IteratorThreadPoolManager.lambda$execute$4(IteratorThreadPoolManager.java:194) ~[?:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Subsequent call producing an NPE
2025-01-09T17:22:19,646 [DATAWAVE Ivarator (4fb08bee)-6667 -> DatawaveFieldIndexRegexIteratorJexl in c259f7d7-e4d8-4aa4-82c0-a646fc7e3c6f_168b80f_[20170716_0 fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 9223372036854775807 false,20170716_0 fi%00;REVISION_TEXT_TOKEN:%f4;%8f;%bf;%bf; [] 9223372036854775807 false]] [iterators.IvaratorRunnable] ERROR: Failed to complete fillSet([20170716_0 fi%00;REVISION_TEXT_TOKEN:%a3;%0a;%7f;%d4; [] 9223372036854775807 false,20170716_0 fi%00;REVISION_TEXT_TOKEN:%f4;%8f;%bf;%bf; [] 9223372036854775807 false])
java.lang.NullPointerException: null
at org.apache.hadoop.hdfs.DFSInputStream.seek(DFSInputStream.java:1575) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:73) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.accumulo.core.file.streams.RateLimitedInputStream.seek(RateLimitedInputStream.java:61) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.streams.SeekableDataInputStream.seek(SeekableDataInputStream.java:38) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream.read(BoundedRangeFileInputStream.java:97) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:179) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:163) ~[hadoop-client-api-3.3.5.jar:?]
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) ~[hadoop-client-api-3.3.5.jar:?]
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290) ~[?:?]
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) ~[?:?]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200) ~[?:?]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170) ~[?:?]
at org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader$BaseBlockLoader.load(CachableBlockFile.java:362) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache.getBlock(SynchronousLoadingBlockCache.java:126) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile$Reader.getDataBlock(CachableBlockFile.java:460) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.getDataBlock(RFile.java:893) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader._next(RFile.java:855) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.file.rfile.RFile$LocalityGroupReader.next(RFile.java:833) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:86) ~[accumulo-server-base-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.HeapIterator.next(HeapIterator.java:75) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.StatsIterator.next(StatsIterator.java:51) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator.next(DeletingIterator.java:65) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.ServerSkippingIterator.next(ServerSkippingIterator.java:45) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.SynchronizedServerFilter.next(SynchronizedServerFilter.java:51) ~[accumulo-core-2.1.3.jar:2.1.3]
at datawave.iterators.PropogatingIterator.next(PropogatingIterator.java:251) ~[?:?]
at org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:100) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103) ~[accumulo-core-2.1.3.jar:2.1.3]
at org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:60) ~[accumulo-core-2.1.3.jar:2.1.3]
at datawave.core.iterators.IvaratorRunnable.run(IvaratorRunnable.java:225) ~[?:?]
at datawave.core.iterators.IteratorThreadPoolManager.lambda$execute$4(IteratorThreadPoolManager.java:194) ~[?:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]