Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.fileio;

import ai.rapids.cudf.HostMemoryBuffer;
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Objects;

/**
* Static helpers shared by {@link RapidsInputFile} implementations.
*/
public final class RapidsInputFiles {
private static final S3PerfReader DISABLED_S3_PERF_READER = new S3PerfReader() {
@Override
public boolean isEnabled() {
return false;
}

@Override
public boolean readVectored(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
List<RapidsInputFile.CopyRange> copyRanges) {
return false;
}

@Override
public boolean readTail(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
long length,
long outputOffset) {
return false;
}
};

private static volatile S3PerfReader s3PerfReader = DISABLED_S3_PERF_READER;

private RapidsInputFiles() {}

/**
* Java bridge for S3 PerfIO integration. The implementation lives in sql-plugin
* because it depends on private Scala PerfIO state.
*/
public interface S3PerfReader {
boolean isEnabled();

boolean readVectored(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
List<RapidsInputFile.CopyRange> copyRanges) throws IOException;

boolean readTail(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
long length,
long outputOffset) throws IOException;
}

public static void setS3PerfReader(S3PerfReader reader) {
s3PerfReader = Objects.requireNonNull(reader, "reader can't be null");
}

public static void resetS3PerfReader() {
s3PerfReader = DISABLED_S3_PERF_READER;
}

/**
* True iff the active SQL-plugin bridge says the S3 PerfIO path is enabled.
* Returns false before the bridge is registered so callers default to the
* non-PerfIO path during early bring-up.
*/
public static boolean isS3PerfEnabled() {
return s3PerfReader.isEnabled();
}

public static boolean readS3Vectored(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
List<RapidsInputFile.CopyRange> copyRanges) throws IOException {
return s3PerfReader.readVectored(hadoopConf, fileUri, output, copyRanges);
}

public static boolean readS3Tail(
Configuration hadoopConf,
URI fileUri,
HostMemoryBuffer output,
long length,
long outputOffset) throws IOException {
return s3PerfReader.readTail(hadoopConf, fileUri, output, length, outputOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package com.nvidia.spark.rapids.fileio.hadoop;

import com.nvidia.spark.rapids.fileio.RapidsInputFiles;
import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO;
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.util.SerializableConfiguration;

Expand All @@ -34,9 +32,18 @@
*/
public class HadoopFileIO implements RapidsFileIO {
private final SerializableConfiguration hadoopConf;
private final HadoopInputFileFactory inputFileFactory;

public HadoopFileIO(Configuration hadoopConf) {
Objects.requireNonNull(hadoopConf, "hadoopConf can't be null");
this.inputFileFactory = null;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
}

public HadoopFileIO(Configuration hadoopConf, HadoopInputFileFactory inputFileFactory) {
Objects.requireNonNull(hadoopConf, "hadoopConf can't be null");
this.inputFileFactory = Objects.requireNonNull(
inputFileFactory, "inputFileFactory can't be null");
this.hadoopConf = new SerializableConfiguration(hadoopConf);
}

Expand All @@ -47,9 +54,9 @@ public RapidsInputFile newInputFile(String path) throws IOException {

@Override
public RapidsInputFile newInputFile(Path path) throws IOException {
String scheme = path.toUri().getScheme();
if (scheme != null && scheme.startsWith("s3") && RapidsInputFiles.isS3PerfEnabled()) {
return S3InputFile.create(path, hadoopConf.value());
Objects.requireNonNull(path, "path can't be null");
if (inputFileFactory != null) {
return inputFileFactory.create(path, hadoopConf.value());
}
return HadoopInputFile.create(path, hadoopConf.value());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.fileio.hadoop;

import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.Serializable;

/**
* Serializable extension point for callers that want to replace Hadoop input
* files with an optimized implementation for selected paths.
*/
@FunctionalInterface
public interface HadoopInputFileFactory extends Serializable {
RapidsInputFile create(Path path, Configuration conf) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.fileio.hadoop;

import com.nvidia.spark.rapids.fileio.RapidsInputFiles;
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/** Hadoop input factory that routes S3 paths through the registered PerfIO bridge. */
public final class PerfIOHadoopInputFileFactory implements HadoopInputFileFactory {
public static final PerfIOHadoopInputFileFactory INSTANCE = new PerfIOHadoopInputFileFactory();

private PerfIOHadoopInputFileFactory() {}

@Override
public RapidsInputFile create(Path path, Configuration conf) throws IOException {
String scheme = path.toUri().getScheme();
if (scheme != null && scheme.startsWith("s3") && RapidsInputFiles.isS3PerfEnabled()) {
return S3InputFile.create(path, conf);
}
return HadoopInputFile.create(path, conf);
}

private Object readResolve() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.fileio.hadoop;

import ai.rapids.cudf.HostMemoryBuffer;
import com.nvidia.spark.rapids.fileio.RapidsInputFiles;
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
import com.nvidia.spark.rapids.jni.fileio.SeekableInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.OptionalLong;

/**
* S3-backed {@link RapidsInputFile} for Hadoop-conf-driven (non-iceberg) reads.
* {@code readVectored} issues batched byte-range GETs through the optimized
* vectored-read path; the other operations delegate to the standard
* {@link HadoopInputFile}.
*/
public class S3InputFile implements RapidsInputFile {
private final HadoopInputFile delegate;
private final URI fileUri;
private final Configuration hadoopConf;

public static S3InputFile create(Path filePath, Configuration conf) throws IOException {
return new S3InputFile(HadoopInputFile.create(filePath, conf), filePath.toUri(), conf);
}

private S3InputFile(HadoopInputFile delegate, URI fileUri, Configuration hadoopConf) {
this.delegate = delegate;
this.fileUri = fileUri;
this.hadoopConf = hadoopConf;
}

@Override
public String path() {
return delegate.path();
}

@Override
public long getLength() throws IOException {
return delegate.getLength();
}

@Override
public OptionalLong getLastModificationTime() throws IOException {
return delegate.getLastModificationTime();
}

@Override
public SeekableInputStream open() throws IOException {
return delegate.open();
}

@Override
public void readVectored(HostMemoryBuffer output, List<RapidsInputFile.CopyRange> copyRanges)
throws IOException {
if (!RapidsInputFiles.readS3Vectored(hadoopConf, fileUri, output, copyRanges)) {
throw new IllegalArgumentException("expected to use PerfIO to read");
}
Comment on lines +74 to +77

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 IllegalArgumentException escapes from a method declared throws IOException

Both readVectored and readTail throw IllegalArgumentException when the PerfIO bridge returns false. Because IllegalArgumentException is an unchecked exception, it will propagate straight through callers that only have a catch (IOException e) guard — the error will be invisible at every layer that handles I/O failures. The Scala original used require() with the same semantics, but in Java the disparity between the declared checked signature and the actual unchecked throw is more surprising. Wrapping in an IOException (throw new IOException("expected to use PerfIO to read")) would align intent with the declared signature and ensure the failure is caught by any standard I/O error handler.

}

/**
* Issue a single suffix-range {@code GetObject} ({@code Range: bytes=-N}) for
* the last {@code length} bytes. Avoids the {@code getLength()} round-trip the
* default {@link RapidsInputFile#readTail} would make.
*/
@Override
public void readTail(long length, HostMemoryBuffer output) throws IOException {
if (length == 0) {
return;
}
if (length < 0) {
throw new IllegalArgumentException("length must be non-negative");
}
if (!RapidsInputFiles.readS3Tail(hadoopConf, fileUri, output, length, 0L)) {
throw new IllegalArgumentException("expected to use PerfIO to read");
}
}
}
Loading
Loading