diff --git a/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java new file mode 100644 index 00000000000..407bdd03fa6 --- /dev/null +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio; + +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Objects; + +/** + * Static helpers shared by {@link RapidsInputFile} implementations. + */ +public final class RapidsInputFiles { + private static final S3PerfReader DISABLED_S3_PERF_READER = new S3PerfReader() { + @Override + public boolean isEnabled() { + return false; + } + + @Override + public boolean readVectored( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + List copyRanges) { + return false; + } + + @Override + public boolean readTail( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + long length, + long outputOffset) { + return false; + } + }; + + private static volatile S3PerfReader s3PerfReader = DISABLED_S3_PERF_READER; + + private RapidsInputFiles() {} + + /** + * Java bridge for S3 PerfIO integration. The implementation lives in sql-plugin + * because it depends on private Scala PerfIO state. + */ + public interface S3PerfReader { + boolean isEnabled(); + + boolean readVectored( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + List copyRanges) throws IOException; + + boolean readTail( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + long length, + long outputOffset) throws IOException; + } + + public static void setS3PerfReader(S3PerfReader reader) { + s3PerfReader = Objects.requireNonNull(reader, "reader can't be null"); + } + + public static void resetS3PerfReader() { + s3PerfReader = DISABLED_S3_PERF_READER; + } + + /** + * True iff the active SQL-plugin bridge says the S3 PerfIO path is enabled. + * Returns false before the bridge is registered so callers default to the + * non-PerfIO path during early bring-up. + */ + public static boolean isS3PerfEnabled() { + return s3PerfReader.isEnabled(); + } + + public static boolean readS3Vectored( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + List copyRanges) throws IOException { + return s3PerfReader.readVectored(hadoopConf, fileUri, output, copyRanges); + } + + public static boolean readS3Tail( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + long length, + long outputOffset) throws IOException { + return s3PerfReader.readTail(hadoopConf, fileUri, output, length, outputOffset); + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java similarity index 75% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java rename to sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java index dd9da173280..f17730211a7 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java @@ -16,12 +16,10 @@ package com.nvidia.spark.rapids.fileio.hadoop; -import com.nvidia.spark.rapids.fileio.RapidsInputFiles; import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO; import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.util.SerializableConfiguration; @@ -34,9 +32,18 @@ */ public class HadoopFileIO implements RapidsFileIO { private final SerializableConfiguration hadoopConf; + private final HadoopInputFileFactory inputFileFactory; public HadoopFileIO(Configuration hadoopConf) { Objects.requireNonNull(hadoopConf, "hadoopConf can't be null"); + this.inputFileFactory = null; + this.hadoopConf = new SerializableConfiguration(hadoopConf); + } + + public HadoopFileIO(Configuration hadoopConf, HadoopInputFileFactory inputFileFactory) { + Objects.requireNonNull(hadoopConf, "hadoopConf can't be null"); + this.inputFileFactory = Objects.requireNonNull( + inputFileFactory, "inputFileFactory can't be null"); this.hadoopConf = new SerializableConfiguration(hadoopConf); } @@ -47,9 +54,9 @@ public RapidsInputFile newInputFile(String path) throws IOException { @Override public RapidsInputFile newInputFile(Path path) throws IOException { - String scheme = path.toUri().getScheme(); - if (scheme != null && scheme.startsWith("s3") && RapidsInputFiles.isS3PerfEnabled()) { - return S3InputFile.create(path, hadoopConf.value()); + Objects.requireNonNull(path, "path can't be null"); + if (inputFileFactory != null) { + return inputFileFactory.create(path, hadoopConf.value()); } return HadoopInputFile.create(path, hadoopConf.value()); } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java similarity index 98% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java rename to sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java index a1688b50be3..25ab03e2a7b 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFile.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFileFactory.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFileFactory.java new file mode 100644 index 00000000000..972a2ce103c --- /dev/null +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputFileFactory.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Serializable extension point for callers that want to replace Hadoop input + * files with an optimized implementation for selected paths. + */ +@FunctionalInterface +public interface HadoopInputFileFactory extends Serializable { + RapidsInputFile create(Path path, Configuration conf) throws IOException; +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java similarity index 97% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java rename to sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java index 289e6dc0355..18b3c464da8 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopInputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java similarity index 97% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java rename to sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java index 3ed1146eefa..f24f77d0fe5 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java similarity index 97% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java rename to sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java index 301570fba1f..9b628bc826f 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOHadoopInputFileFactory.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOHadoopInputFileFactory.java new file mode 100644 index 00000000000..2fcd6d896af --- /dev/null +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOHadoopInputFileFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import com.nvidia.spark.rapids.fileio.RapidsInputFiles; +import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** Hadoop input factory that routes S3 paths through the registered PerfIO bridge. */ +public final class PerfIOHadoopInputFileFactory implements HadoopInputFileFactory { + public static final PerfIOHadoopInputFileFactory INSTANCE = new PerfIOHadoopInputFileFactory(); + + private PerfIOHadoopInputFileFactory() {} + + @Override + public RapidsInputFile create(Path path, Configuration conf) throws IOException { + String scheme = path.toUri().getScheme(); + if (scheme != null && scheme.startsWith("s3") && RapidsInputFiles.isS3PerfEnabled()) { + return S3InputFile.create(path, conf); + } + return HadoopInputFile.create(path, conf); + } + + private Object readResolve() { + return INSTANCE; + } +} diff --git a/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.java b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.java new file mode 100644 index 00000000000..d9932bfda4e --- /dev/null +++ b/sql-plugin-fileio/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.fileio.RapidsInputFiles; +import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import com.nvidia.spark.rapids.jni.fileio.SeekableInputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.OptionalLong; + +/** + * S3-backed {@link RapidsInputFile} for Hadoop-conf-driven (non-iceberg) reads. + * {@code readVectored} issues batched byte-range GETs through the optimized + * vectored-read path; the other operations delegate to the standard + * {@link HadoopInputFile}. + */ +public class S3InputFile implements RapidsInputFile { + private final HadoopInputFile delegate; + private final URI fileUri; + private final Configuration hadoopConf; + + public static S3InputFile create(Path filePath, Configuration conf) throws IOException { + return new S3InputFile(HadoopInputFile.create(filePath, conf), filePath.toUri(), conf); + } + + private S3InputFile(HadoopInputFile delegate, URI fileUri, Configuration hadoopConf) { + this.delegate = delegate; + this.fileUri = fileUri; + this.hadoopConf = hadoopConf; + } + + @Override + public String path() { + return delegate.path(); + } + + @Override + public long getLength() throws IOException { + return delegate.getLength(); + } + + @Override + public OptionalLong getLastModificationTime() throws IOException { + return delegate.getLastModificationTime(); + } + + @Override + public SeekableInputStream open() throws IOException { + return delegate.open(); + } + + @Override + public void readVectored(HostMemoryBuffer output, List copyRanges) + throws IOException { + if (!RapidsInputFiles.readS3Vectored(hadoopConf, fileUri, output, copyRanges)) { + throw new IllegalArgumentException("expected to use PerfIO to read"); + } + } + + /** + * Issue a single suffix-range {@code GetObject} ({@code Range: bytes=-N}) for + * the last {@code length} bytes. Avoids the {@code getLength()} round-trip the + * default {@link RapidsInputFile#readTail} would make. + */ + @Override + public void readTail(long length, HostMemoryBuffer output) throws IOException { + if (length == 0) { + return; + } + if (length < 0) { + throw new IllegalArgumentException("length must be non-negative"); + } + if (!RapidsInputFiles.readS3Tail(hadoopConf, fileUri, output, length, 0L)) { + throw new IllegalArgumentException("expected to use PerfIO to read"); + } + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java deleted file mode 100644 index e30bbce09b3..00000000000 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.fileio; - -import com.nvidia.spark.rapids.PerfIOConf; -import org.apache.spark.SparkEnv; - -/** - * Static helpers shared by {@link com.nvidia.spark.rapids.jni.fileio.RapidsInputFile} - * implementations. - */ -public final class RapidsInputFiles { - private RapidsInputFiles() {} - - /** - * True iff {@code spark.rapids.perfio.s3.enabled} is set to {@code true} on - * the active SparkConf. Returns false when no {@link SparkEnv} is initialized - * (e.g. before driver bring-up) so callers default to the non-PerfIO path. - */ - public static boolean isS3PerfEnabled() { - SparkEnv env = SparkEnv.get(); - if (env == null) { - return false; - } - return env.conf().getBoolean(PerfIOConf.S3PERF_ENABLED().key(), false); - } -} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOS3Reader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOS3Reader.java new file mode 100644 index 00000000000..5277c382ee0 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/PerfIOS3Reader.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.IntRangeWithOffset; +import com.nvidia.spark.rapids.PerfIO$; +import com.nvidia.spark.rapids.PerfIOConf; +import com.nvidia.spark.rapids.RangeWithOffset; +import com.nvidia.spark.rapids.SuffixRangeWithOffset; +import com.nvidia.spark.rapids.fileio.RapidsInputFiles.S3PerfReader; +import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkEnv; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +/** SQL-plugin bridge from Java-only file I/O classes to private Scala PerfIO state. */ +public final class PerfIOS3Reader implements S3PerfReader { + public static final PerfIOS3Reader INSTANCE = new PerfIOS3Reader(); + + private PerfIOS3Reader() {} + + @Override + public boolean isEnabled() { + SparkEnv env = SparkEnv.get(); + if (env == null) { + return false; + } + return env.conf().getBoolean(PerfIOConf.S3PERF_ENABLED().key(), false); + } + + @Override + public boolean readVectored( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + List copyRanges) throws IOException { + List ranges = new ArrayList<>(copyRanges.size()); + for (RapidsInputFile.CopyRange range : copyRanges) { + ranges.add(new IntRangeWithOffset( + range.getInputOffset(), range.getLength(), range.getOutputOffset())); + } + return readToHostMemory(hadoopConf, fileUri, output, ranges); + } + + @Override + public boolean readTail( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + long length, + long outputOffset) throws IOException { + List ranges = new ArrayList<>(1); + ranges.add(new SuffixRangeWithOffset(length, outputOffset)); + return readToHostMemory(hadoopConf, fileUri, output, ranges); + } + + private boolean readToHostMemory( + Configuration hadoopConf, + URI fileUri, + HostMemoryBuffer output, + List ranges) { + Option result = PerfIO$.MODULE$.readToHostMemory( + hadoopConf, + output, + fileUri, + () -> JavaConverters.asScalaBufferConverter(ranges).asScala().toSeq()); + return result.isDefined(); + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.scala deleted file mode 100644 index 3a8c7ed9185..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/fileio/hadoop/S3InputFile.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.fileio.hadoop - -import java.io.IOException -import java.net.URI -import java.util.OptionalLong - -import scala.collection.JavaConverters._ - -import ai.rapids.cudf.HostMemoryBuffer -import com.nvidia.spark.rapids.{IntRangeWithOffset, PerfIO, RangeWithOffset, SuffixRangeWithOffset} -import com.nvidia.spark.rapids.jni.fileio.{RapidsInputFile, SeekableInputStream} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -/** - * S3-backed {@link RapidsInputFile} for Hadoop-conf-driven (non-iceberg) reads. - * {@code readVectored} issues batched byte-range GETs through the optimized - * vectored-read path; the other operations delegate to the standard - * {@link HadoopInputFile}. - */ -class S3InputFile private ( - delegate: HadoopInputFile, - fileUri: URI, - hadoopConf: Configuration) - extends RapidsInputFile { - - override def path(): String = delegate.path() - - @throws[IOException] - override def getLength(): Long = delegate.getLength() - - @throws[IOException] - override def getLastModificationTime(): OptionalLong = delegate.getLastModificationTime() - - @throws[IOException] - override def open(): SeekableInputStream = delegate.open() - - @throws[IOException] - override def readVectored( - output: HostMemoryBuffer, - copyRanges: java.util.List[RapidsInputFile.CopyRange]): Unit = { - val ranges = copyRanges.asScala.map { r => - IntRangeWithOffset(r.getInputOffset, r.getLength, r.getOutputOffset) - }.toSeq - require( - PerfIO.readToHostMemory(hadoopConf, output, fileUri, ranges).isDefined, - "expected to use PerfIO to read") - } - - /** - * Issue a single suffix-range {@code GetObject} ({@code Range: bytes=-N}) for - * the last {@code length} bytes. Avoids the {@code getLength()} round-trip the - * default {@link RapidsInputFile#readTail} would make. - */ - @throws[IOException] - override def readTail(length: Long, output: HostMemoryBuffer): Unit = { - if (length == 0) { - return - } - if (length < 0) { - throw new IllegalArgumentException("length must be non-negative") - } - val ranges = Seq[RangeWithOffset](SuffixRangeWithOffset(length, /*destOffset*/ 0L)) - require( - PerfIO.readToHostMemory(hadoopConf, output, fileUri, ranges).isDefined, - "expected to use PerfIO to read") - } -} - -object S3InputFile { - @throws[IOException] - def create(filePath: Path, conf: Configuration): S3InputFile = { - new S3InputFile(HadoopInputFile.create(filePath, conf), filePath.toUri, conf) - } -}