Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,13 +29,32 @@ import org.apache.hadoop.fs.permission.{FsAction, FsPermission}

import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.util.SerializableConfiguration

object GpuCoreDumpHandler extends Logging {
object GpuCoreDumpHandler {
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

private def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) {
log.info(msg)
}
}

private def logWarning(msg: => String, throwable: Throwable): Unit = {
log.warn(msg, throwable)
}

private def logError(msg: => String): Unit = {
log.error(msg)
}

private def logError(msg: => String, throwable: Throwable): Unit = {
log.error(msg, throwable)
}

private var executor: Option[ExecutorService] = None
private var dumpedPath: Option[String] = None
private var namedPipeFile: File = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.nvidia.spark.rapids.jni.RmmSpark
import com.nvidia.spark.rapids.spill.SpillFramework

import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -38,7 +37,29 @@ private case object Initialized extends MemoryState
private case object Uninitialized extends MemoryState
private case object Errored extends MemoryState

object GpuDeviceManager extends Logging {
object GpuDeviceManager {
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

private def logDebug(msg: => String): Unit = {
if (log.isDebugEnabled) {
log.debug(msg)
}
}

private def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) {
log.info(msg)
}
}

private def logWarning(msg: => String): Unit = {
log.warn(msg)
}

private def logError(msg: => String): Unit = {
log.error(msg)
}

// This config controls whether RMM/Pinned memory are initialized from the task
// or from the executor side plugin. The default is to initialize from the
// executor plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class GetJsonObjectCombiner(private val exp: GpuGetJsonObject) extends GpuExpres
override def addExpression(e: Expression): Unit = {
val localOutputLocation = outputLocation
outputLocation += 1
val key = GpuExpressionEquals(e)
val key = new GpuExpressionEquals(e)
if (!toCombine.contains(key)) {
toCombine.put(key, localOutputLocation)
}
Expand Down Expand Up @@ -329,7 +329,7 @@ class GetJsonObjectCombiner(private val exp: GpuGetJsonObject) extends GpuExpres
}

override def getReplacementExpression(e: Expression): Option[Expression] = {
toCombine.get(GpuExpressionEquals(e)).map { localId =>
toCombine.get(new GpuExpressionEquals(e)).map { localId =>
GpuGetStructField(multiGet, localId, Some(fieldName(localId)))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,15 +24,30 @@ import ai.rapids.cudf.Cuda

import org.apache.spark.SparkConf
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}

/**
* Note, this class should not be referenced directly in source code.
* It should be loaded by reflection using ShimLoader.newInstanceOf, see ./docs/dev/shims.md
*/
protected class InternalExclusiveModeGpuDiscoveryPlugin
extends ResourceDiscoveryPlugin with Logging {
extends ResourceDiscoveryPlugin {

private val log = org.slf4j.LoggerFactory.getLogger(
classOf[InternalExclusiveModeGpuDiscoveryPlugin])

private def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) {
log.info(msg)
}
}

private def logWarning(msg: => String): Unit = {
if (log.isWarnEnabled) {
log.warn(msg)
}
}

override def discoverResource(
request: ResourceRequest,
sparkconf: SparkConf
Expand Down
10 changes: 6 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,8 @@

package com.nvidia.spark.rapids

import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat,
Long => JLong, Short => JShort}
import java.math.BigInteger
import java.time.{LocalDate, OffsetDateTime}
import java.util
Expand All @@ -31,7 +32,7 @@ import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.shims.{GpuTypeShims, SparkShimImpl}
import org.apache.commons.codec.binary.{Hex => ApacheHex}
import org.json4s.JsonAST.{JField, JNull, JString}
import org.json4s.JsonAST.{JField, JNull, JString, JValue}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -685,7 +686,8 @@ case class GpuLiteral (value: Any, dataType: DataType) extends GpuLeafExpression
case (l: Long, TimestampType) => JString(DateTimeUtils.toJavaTimestamp(l).toString)
case (other, _) => JString(other.toString)
}
("value" -> jsonValue) :: ("dataType" -> TrampolineUtil.jsonValue(dataType)) :: Nil
("value" -> jsonValue) ::
("dataType" -> TrampolineUtil.jsonValue(dataType).asInstanceOf[JValue]) :: Nil
}

override def sql: String = (value, dataType) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,11 +27,11 @@ import org.apache.spark.sql.types.DataType
* @param argOffsets The offsets of the original arguments in "flattenedArgs"
* @param argNames The optional argument names
*/
case class GpuPythonArguments(
flattenedArgs: Seq[Expression],
flattenedTypes: Seq[DataType],
argOffsets: Array[Array[Int]],
argNames: Option[Array[Array[Option[String]]]])
class GpuPythonArguments(
val flattenedArgs: Seq[Expression],
val flattenedTypes: Seq[DataType],
val argOffsets: Array[Array[Int]],
val argNames: Option[Array[Array[Option[String]]]])

/** Gpu version of ArgumentMetadata */
case class GpuArgumentMeta(offset: Int, name: Option[String])
class GpuArgumentMeta(val offset: Int, val name: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import com.nvidia.spark.rapids.python.PythonConfEntries.CONCURRENT_PYTHON_WORKER
import org.apache.commons.lang3.mutable.MutableInt

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging

/*
* PythonWorkerSemaphore is used to limit the number of Python workers(processes) to be started
Expand All @@ -41,7 +40,15 @@ import org.apache.spark.internal.Logging
* the inner semaphore when no longer needed.
*
*/
object PythonWorkerSemaphore extends Logging {
object PythonWorkerSemaphore {
private val log = org.slf4j.LoggerFactory.getLogger(
"com.nvidia.spark.rapids.python.PythonWorkerSemaphore")

private def logDebug(msg: => String): Unit = {
if (log.isDebugEnabled) {
log.debug(msg)
}
}

private lazy val rapidsConf = new RapidsConf(SparkEnv.get.conf)
private lazy val workersPerGpu = rapidsConf.get(CONCURRENT_PYTHON_WORKERS)
Expand Down Expand Up @@ -97,7 +104,15 @@ object PythonWorkerSemaphore extends Logging {
}
}

private final class PythonWorkerSemaphore(tasksPerGpu: Int) extends Logging {
private final class PythonWorkerSemaphore(tasksPerGpu: Int) {
private val log = org.slf4j.LoggerFactory.getLogger(classOf[PythonWorkerSemaphore])

private def logDebug(msg: => String): Unit = {
if (log.isDebugEnabled) {
log.debug(msg)
}
}

private val semaphore = new Semaphore(tasksPerGpu)
// Map to track which tasks have acquired the semaphore.
private val activeTasks = new ConcurrentHashMap[Long, MutableInt]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object BucketingUtilsShim {
// table and a normal one.
val bucketIdExpression = GpuHashPartitioning(bucketColumns, spec.numBuckets)
.partitionIdExpression
GpuWriterBucketSpec(bucketIdExpression, (_: Int) => "")
new GpuWriterBucketSpec(bucketIdExpression, (_: Int) => "")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object ParquetSchemaClipShims {
val scale = decimalLogicalTypeAnnotation.getScale

if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) {
throw new RapidsAnalysisException(s"Invalid decimal precision: $typeName " +
throw RapidsAnalysisException(s"Invalid decimal precision: $typeName " +
s"cannot store $precision digits (max $maxPrecision)")
}

Expand Down Expand Up @@ -166,14 +166,14 @@ object ParquetSchemaClipShims {
ParquetTimestampAnnotationShims.timestampTypeForMillisOrMicros(timestamp)
case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS &&
ParquetLegacyNanoAsLongShims.legacyParquetNanosAsLong =>
throw new RapidsAnalysisException(
throw RapidsAnalysisException(
"GPU does not support spark.sql.legacy.parquet.nanosAsLong")
case _ => illegalType()
}

case INT96 =>
if (!SQLConf.get.isParquetINT96AsTimestamp) {
throw new RapidsAnalysisException(
throw RapidsAnalysisException(
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.hybrid.{CoalesceBatchConverter => NativeConverter, HybridHostRetryAllocator, RapidsHostColumn}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
Expand All @@ -36,7 +35,13 @@ class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
targetBatchSizeInBytes: Long,
schema: StructType,
metrics: Map[String, GpuMetric])
extends Iterator[Array[RapidsHostColumn]] with Logging {
extends Iterator[Array[RapidsHostColumn]] {

@transient private lazy val log = org.slf4j.LoggerFactory.getLogger(
classOf[CoalesceConvertIterator])

private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg)


private var converterImpl: NativeConverter = _

Expand Down Expand Up @@ -140,7 +145,7 @@ class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
}
}

object CoalesceConvertIterator extends Logging {
object CoalesceConvertIterator {
/**
* Consumes the RapidsHostBatchProducer and converts the HostColumnVectors to Device ones.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@ import java.util.Locale
import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.{RapidsConf, VersionUtils}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnresolvedHint}
import org.apache.spark.sql.catalyst.trees.TreePattern
Expand All @@ -33,7 +32,6 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._

object HybridExecutionUtils extends PredicateHelper {

private val HYBRID_JAR_PLUGIN_CLASS_NAME = "com.nvidia.spark.rapids.hybrid.HybridPluginWrapper"

/**
Expand Down Expand Up @@ -434,7 +432,7 @@ object HybridExecutionUtils extends PredicateHelper {
}
}

object HybridExecOverrides extends Logging {
object HybridExecOverrides {
// The SQL hint enables HybridScan for specific tables even if HYBRID_PARQUET_READER is disabled
val HYBRID_SCAN_HINT = "HYBRID_SCAN"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,6 @@ import com.nvidia.spark.rapids.hybrid.RapidsHostColumn
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
Expand Down Expand Up @@ -92,7 +91,15 @@ class PrefetchHostBatchProducer(
taskAttId: Long,
base: Iterator[Array[RapidsHostColumn]],
capacity: Int,
waitTimeMetric: GpuMetric) extends RapidsHostBatchProducer with Logging {
waitTimeMetric: GpuMetric) extends RapidsHostBatchProducer {

@transient private lazy val log = org.slf4j.LoggerFactory.getLogger(
classOf[PrefetchHostBatchProducer])

private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg)

private def logError(msg: => String): Unit = if (log.isErrorEnabled) log.error(msg)


@volatile private var isInit: Boolean = false
// Mark if there is in-progress element being produced in producerThread
Expand Down
Loading
Loading