diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/RandomAccessFileChannel.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/RandomAccessFileChannel.java index 5c7fcee6afb2..33b3ceed7443 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/RandomAccessFileChannel.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/RandomAccessFileChannel.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils.io; +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -29,7 +30,7 @@ import org.slf4j.LoggerFactory; /** {@link RandomAccessFile} and its {@link FileChannel}. */ -public class RandomAccessFileChannel { +public class RandomAccessFileChannel implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(RandomAccessFileChannel.class); private File blockFile; @@ -47,9 +48,12 @@ public synchronized boolean isOpen() { /** Open the given file in read-only mode. */ public synchronized void open(File file) throws FileNotFoundException { Preconditions.assertNull(blockFile, "blockFile"); - blockFile = Objects.requireNonNull(file, "blockFile == null"); - raf = new RandomAccessFile(blockFile, "r"); - channel = raf.getChannel(); + final File f = Objects.requireNonNull(file, "blockFile == null"); + final RandomAccessFile newRaf = new RandomAccessFile(f, "r"); + final FileChannel newChannel = newRaf.getChannel(); + blockFile = f; + raf = newRaf; + channel = newChannel; } /** Similar to {@link FileChannel#position(long)}. */ @@ -86,22 +90,31 @@ public synchronized boolean read(ByteBuffer buffer) throws IOException { * In case of exception, this method catches the exception, logs a warning message, * and then continue closing the remaining resources. */ + @Override public synchronized void close() { - if (blockFile == null) { + final File fileToClose = blockFile; + if (fileToClose == null) { return; } blockFile = null; + try { - channel.close(); - channel = null; + if (channel != null) { + channel.close(); + } } catch (IOException e) { - LOG.warn("Failed to close channel for {}", blockFile, e); + LOG.warn("Failed to close channel for {}", fileToClose, e); + } finally { + channel = null; } try { - raf.close(); - raf = null; + if (raf != null) { + raf.close(); + } } catch (IOException e) { - LOG.warn("Failed to close RandomAccessFile for {}", blockFile, e); + LOG.warn("Failed to close RandomAccessFile for {}", fileToClose, e); + } finally { + raf = null; } } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/io/TestRandomAccessFileChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/io/TestRandomAccessFileChannel.java new file mode 100644 index 000000000000..944ce752daa1 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/io/TestRandomAccessFileChannel.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.io; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestRandomAccessFileChannel { + @TempDir + private Path tempDir; + + @Test + void openFailureDoesNotLeaveOpenAndCloseIsSafe() { + final RandomAccessFileChannel c = new RandomAccessFileChannel(); + final File missing = tempDir.resolve("missing-file").toFile(); + + assertThrows(FileNotFoundException.class, () -> c.open(missing)); + assertFalse(c.isOpen()); + assertDoesNotThrow(c::close); + } + + @Test + void closeIsIdempotent() throws Exception { + final RandomAccessFileChannel c = new RandomAccessFileChannel(); + final File f = tempDir.resolve("file").toFile(); + try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { + raf.write(1); + } + + c.open(f); + assertTrue(c.isOpen()); + + assertDoesNotThrow(c::close); + assertFalse(c.isOpen()); + assertDoesNotThrow(c::close); + } + + @Test + void closeContinuesToCloseRafEvenIfChannelCloseFails() throws Exception { + final RandomAccessFileChannel c = new RandomAccessFileChannel(); + final File f = tempDir.resolve("file-to-close").toFile(); + try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { + raf.write(1); + } + + final TrackingRandomAccessFile trackingRaf = new TrackingRandomAccessFile(f); + setField(c, "blockFile", f); + setField(c, "channel", new FailingCloseFileChannel()); + setField(c, "raf", trackingRaf); + + assertDoesNotThrow(c::close); + assertTrue(trackingRaf.isClosed(), "raf.close() should still be called"); + } + + @Test + void closeDoesNotThrowWhenRafAndChannelAreNull() throws Exception { + final RandomAccessFileChannel c = new RandomAccessFileChannel(); + setField(c, "blockFile", tempDir.resolve("dummy").toFile()); + setField(c, "channel", null); + setField(c, "raf", null); + assertDoesNotThrow(c::close); + } + + @Test + void readWithZeroSizedBuffer() throws Exception { + final RandomAccessFileChannel c = new RandomAccessFileChannel(); + final File f = tempDir.resolve("test-file").toFile(); + try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { + raf.write(new byte[]{1, 2, 3, 4, 5}); + } + + c.open(f); + assertTrue(c.isOpen()); + + final ByteBuffer zeroSizedBuffer = ByteBuffer.allocate(0); + // Should return immediately without reading (buffer has no remaining capacity) + assertTrue(c.read(zeroSizedBuffer), "read() should return true for zero-sized buffer"); + // Verify buffer state unchanged + assertEquals(0, zeroSizedBuffer.remaining()); + assertEquals(0, zeroSizedBuffer.position()); + assertEquals(0, zeroSizedBuffer.limit()); + } + + private static void setField(Object target, String name, Object value) + throws ReflectiveOperationException { + final Field f = RandomAccessFileChannel.class.getDeclaredField(name); + f.setAccessible(true); + f.set(target, value); + } + + private static final class TrackingRandomAccessFile extends RandomAccessFile { + private volatile boolean closed; + + private TrackingRandomAccessFile(File f) throws FileNotFoundException { + super(f, "rw"); + } + + @Override + public void close() throws IOException { + closed = true; + super.close(); + } + + private boolean isClosed() { + return closed; + } + } + + /** + * {@link FileChannel#close()} is final (inherited), so we implement a minimal {@link FileChannel} + * and throw from {@link #implCloseChannel()} to simulate close failure. + */ + private static final class FailingCloseFileChannel extends FileChannel { + @Override + protected void implCloseChannel() throws IOException { + throw new IOException("simulated close failure"); + } + + @Override + public int read(ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int write(ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long position() { + throw new UnsupportedOperationException(); + } + + @Override + public FileChannel position(long newPosition) { + throw new UnsupportedOperationException(); + } + + @Override + public long size() { + throw new UnsupportedOperationException(); + } + + @Override + public FileChannel truncate(long size) { + throw new UnsupportedOperationException(); + } + + @Override + public void force(boolean metaData) { + throw new UnsupportedOperationException(); + } + + @Override + public long transferTo(long position, long count, WritableByteChannel target) { + throw new UnsupportedOperationException(); + } + + @Override + public long transferFrom(ReadableByteChannel src, long position, long count) { + throw new UnsupportedOperationException(); + } + + @Override + public int read(ByteBuffer dst, long position) { + throw new UnsupportedOperationException(); + } + + @Override + public int write(ByteBuffer src, long position) { + throw new UnsupportedOperationException(); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock lock(long position, long size, boolean shared) { + throw new UnsupportedOperationException(); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) { + throw new UnsupportedOperationException(); + } + } +} +