Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class GpuReaderFactory(private val metrics: Map[String, GpuMetric],
queryUsesInputFile || hasFilePathMetadata || hasRowPositionMetadata ||
!hasNoDeletes
MultiThread(poolConfBuilder, partition.maxNumParquetFilesParallel,
CombineConf(combineThresholdSize, combineWaitTime),
new CombineConf(combineThresholdSize, combineWaitTime),
disableCombining,
hasFilePathMetadata,
hasRowPositionMetadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.io.Serializable;
import java.util.Objects;

public class AutoCloseableTargetSize implements AutoCloseable, Serializable {
private static final long serialVersionUID = 1L;

public final long targetSize;
public final long minSize;
public final long dataSize;

public AutoCloseableTargetSize(long targetSize, long minSize) {
this(targetSize, minSize, 0);
}

public AutoCloseableTargetSize(long targetSize, long minSize, long dataSize) {
this.targetSize = targetSize;
this.minSize = minSize;
this.dataSize = dataSize;
}

public long targetSize() {
return targetSize;
}

public long minSize() {
return minSize;
}

public long dataSize() {
return dataSize;
}

@Override
public void close() {
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof AutoCloseableTargetSize)) {
return false;
}
AutoCloseableTargetSize that = (AutoCloseableTargetSize) other;
return targetSize == that.targetSize &&
minSize == that.minSize &&
dataSize == that.dataSize;
}

@Override
public int hashCode() {
return Objects.hash(targetSize, minSize, dataSize);
}

@Override
public String toString() {
return "AutoCloseableTargetSize(" + targetSize + "," + minSize + "," + dataSize + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.io.Serializable;
import java.util.Objects;

public class CombineConf implements Serializable {
private static final long serialVersionUID = 1L;

private final long combineThresholdSize;
private final int combineWaitTime;

public CombineConf(long combineThresholdSize, int combineWaitTime) {
this.combineThresholdSize = combineThresholdSize;
this.combineWaitTime = combineWaitTime;
}

public long combineThresholdSize() {
return combineThresholdSize;
}

public int combineWaitTime() {
return combineWaitTime;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof CombineConf)) {
return false;
}
CombineConf that = (CombineConf) other;
return combineThresholdSize == that.combineThresholdSize &&
combineWaitTime == that.combineWaitTime;
}

@Override
public int hashCode() {
return Objects.hash(combineThresholdSize, combineWaitTime);
}

@Override
public String toString() {
return "CombineConf(" + combineThresholdSize + "," + combineWaitTime + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.util.Objects;

public class DefaultThreadPoolConf implements ThreadPoolConf {
private static final long serialVersionUID = 1L;

private final int maxThreadNumber;
private final boolean stageLevelPool;

public DefaultThreadPoolConf(int maxThreadNumber, boolean stageLevelPool) {
this.maxThreadNumber = maxThreadNumber;
this.stageLevelPool = stageLevelPool;
}

@Override
public int maxThreadNumber() {
return maxThreadNumber;
}

@Override
public boolean stageLevelPool() {
return stageLevelPool;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof DefaultThreadPoolConf)) {
return false;
}
DefaultThreadPoolConf that = (DefaultThreadPoolConf) other;
return maxThreadNumber == that.maxThreadNumber &&
stageLevelPool == that.stageLevelPool;
}

@Override
public int hashCode() {
return Objects.hash(maxThreadNumber, stageLevelPool);
}

@Override
public String toString() {
return "DefaultThreadPoolConf(" + maxThreadNumber + "," + stageLevelPool + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.BaseDeviceMemoryBuffer;
import ai.rapids.cudf.DeviceMemoryBuffer;

public final class DeviceBuffersUtils {
private DeviceBuffersUtils() {}

public static BaseDeviceMemoryBuffer[] incRefCount(BaseDeviceMemoryBuffer[] bufs) {
BaseDeviceMemoryBuffer[] ret = new BaseDeviceMemoryBuffer[bufs.length];
int initialized = 0;
try {
for (BaseDeviceMemoryBuffer buf : bufs) {
buf.incRefCount();
ret[initialized] = buf;
initialized++;
}
return ret;
} catch (Throwable t) {
closeAll(ret, initialized, t);
throw t;
}
}

public static DeviceMemoryBuffer[] allocateBuffers(long[] bufSizes) {
DeviceMemoryBuffer[] ret = new DeviceMemoryBuffer[bufSizes.length];
int initialized = 0;
try (DeviceMemoryBuffer singleBuf = DeviceMemoryBuffer.allocate(sum(bufSizes))) {
long curPos = 0L;
for (long len : bufSizes) {
ret[initialized] = singleBuf.slice(curPos, len);
initialized++;
curPos += len;
}
return ret;
} catch (Throwable t) {
closeAll(ret, initialized, t);
throw t;
}
}

private static long sum(long[] values) {
long ret = 0L;
for (long value : values) {
ret += value;
}
return ret;
}

private static void closeAll(AutoCloseable[] values, int count, Throwable cause) {
for (int i = 0; i < count; i++) {
AutoCloseable value = values[i];
if (value != null) {
try {
value.close();
} catch (Throwable t) {
cause.addSuppressed(t);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import java.lang.management.ManagementFactory;

import ai.rapids.cudf.Cuda;
import ai.rapids.cudf.CudaComputeMode;

/**
* Caches executor-related information. Values are initialized lazily to match the previous Scala
* object semantics.
*/
final class ExecutorCache {
private ExecutorCache() {
}

static CudaComputeMode getCurrentDeviceComputeMode() {
return CurrentDeviceComputeModeHolder.VALUE;
}

static byte[] getCurrentDeviceUuid() {
return CurrentDeviceUuidHolder.VALUE;
}

static String getProcessName() {
return ProcessNameHolder.VALUE;
}

private static final class CurrentDeviceComputeModeHolder {
private static final CudaComputeMode VALUE = Cuda.getComputeMode();
}

private static final class CurrentDeviceUuidHolder {
private static final byte[] VALUE = Cuda.getGpuUuid();
}

private static final class ProcessNameHolder {
private static final String VALUE = ManagementFactory.getRuntimeMXBean().getName();
}
}
Loading