diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AllocationRetryCoverageTracker.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AllocationRetryCoverageTracker.scala index 937ac453672..3c3359682ba 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AllocationRetryCoverageTracker.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AllocationRetryCoverageTracker.scala @@ -23,7 +23,6 @@ import java.util.regex.Pattern import com.nvidia.spark.rapids.Arm.withResource -import org.apache.spark.internal.Logging /** * Memory allocation kind for retry coverage tracking. @@ -62,7 +61,17 @@ object AllocationKind extends Enumeration { * * See: https://github.com/NVIDIA/spark-rapids/issues/13672 */ -object AllocationRetryCoverageTracker extends Logging { +object AllocationRetryCoverageTracker { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + + private def logError(msg: => String, throwable: Throwable): Unit = { + log.error(msg, throwable) + } + import AllocationKind._ // Environment variable to enable retry coverage tracking (debug-only). diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ArrayUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ArrayUtils.scala index b353aa4f126..05a32a16a37 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ArrayUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ArrayUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025-2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import org.apache.spark.sql.catalyst.expressions.{ArrayDistinct, Expression} import org.apache.spark.sql.rapids.GpuArrayDistinct -case class GpuArrayDistinctMeta( +class GpuArrayDistinctMeta( expr: ArrayDistinct, override val conf: RapidsConf, parentMetaOpt: Option[RapidsMeta[_, _, _]], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 795258dfe14..ae5e172c144 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -16,13 +16,12 @@ package com.nvidia.spark.rapids -import java.time.LocalDate +import java.time.{Instant, LocalDate} import scala.collection.mutable.ListBuffer import ai.rapids.cudf.{DType, Scalar} import com.nvidia.spark.rapids.VersionUtils.isSpark320OrLater -import com.nvidia.spark.rapids.shims.DateTimeUtilsShims import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateToDays import org.apache.spark.sql.internal.SQLConf @@ -53,6 +52,11 @@ object DateUtils { val ONE_SECOND_MICROSECONDS = 1000000 + private def currentTimestampMicros: Long = { + val instant = Instant.now() + instant.getEpochSecond * ONE_SECOND_MICROSECONDS + instant.getNano / 1000 + } + val ONE_DAY_SECONDS = 86400L val ONE_DAY_MICROSECONDS = 86400000000L @@ -80,7 +84,7 @@ object DateUtils { Map.empty } else { val today = currentDate() - val now = DateTimeUtilsShims.currentTimestamp + val now = currentTimestampMicros Map( EPOCH -> 0, NOW -> now / 1000000L, @@ -94,7 +98,7 @@ object DateUtils { Map.empty } else { val today = currentDate() - val now = DateTimeUtilsShims.currentTimestamp + val now = currentTimestampMicros Map( EPOCH -> 0, NOW -> now, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala index 55df29f3454..e116d85b311 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,14 +18,15 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector, DType, Scalar} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.shims.ShimPredicate -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, Predicate} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, FloatType} case class GpuInSet( child: Expression, - list: Seq[Any]) extends GpuUnaryExpression with Predicate { + list: Seq[Any]) extends GpuUnaryExpression with ShimPredicate { require(list != null, "list should not be null") @transient private[this] lazy val hasNull: Boolean = list.contains(null) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMapUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMapUtils.scala index 1de3b50de3b..e8dd755c82d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMapUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMapUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -177,7 +177,7 @@ object GpuMapUtils { } -case class GpuMapFromArraysMeta(expr: MapFromArrays, +class GpuMapFromArraysMeta(expr: MapFromArrays, override val conf: RapidsConf, override val parent: Option[RapidsMeta[_, _, _]], rule: DataFromReplacementRule) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala index f6ae2bbbe8e..2065bf7b375 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala @@ -21,7 +21,6 @@ import scala.collection.immutable.TreeMap import com.nvidia.spark.rapids.metrics.GpuBubbleTimerManager import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -86,7 +85,7 @@ class GpuMetricFactory(metricsConf: MetricsLevel, context: SparkContext) { createInternal(level, SQLMetrics.createTimingMetric(context, name)) } -object GpuMetric extends Logging { +object GpuMetric { // Metric names. val BUFFER_TIME = "bufferTime" val BUFFER_TIME_BUBBLE = "bufferTimeBubble" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MemoryChecker.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MemoryChecker.scala index 33124b28fdc..65e8e2bbcf2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MemoryChecker.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MemoryChecker.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import scala.util.{Failure, Success, Try} import com.nvidia.spark.rapids.Arm.withResource import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging trait MemoryChecker { def getAvailableMemoryBytes(rapidsConf: RapidsConf): Option[Long] @@ -38,7 +37,19 @@ trait MemoryChecker { * on which it checks corresponding files, env variables, etc. for memory usage * and limits. */ -object MemoryCheckerImpl extends MemoryChecker with Logging { +object MemoryCheckerImpl extends MemoryChecker { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + def main(args: Array[String]): Unit = { val conf = new RapidsConf(new SparkConf()) println(s"Available memory: ${getAvailableMemoryBytes(conf)} bytes") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxRangeWithDoc.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxRangeWithDoc.scala index b700e917cca..7759d990439 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxRangeWithDoc.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxRangeWithDoc.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,18 +22,17 @@ import scala.collection.mutable import ai.rapids.cudf.{NvtxColor, NvtxRange} -import org.apache.spark.internal.Logging - -object RangeDebugger extends Logging { +object RangeDebugger { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) val threadLocalStack = new ThreadLocal[mutable.ArrayStack[NvtxId]] { override def initialValue(): mutable.ArrayStack[NvtxId] = mutable.ArrayStack[NvtxId]() } private def dumpOrderErrorMessage(popped: Option[NvtxId], elem: NvtxId): Unit = { - logError(s"OUT OF ORDER POP of $elem") - logError(s"TOP OF STACK IS ${popped.getOrElse("")}") + log.error(s"OUT OF ORDER POP of $elem") + log.error(s"TOP OF STACK IS ${popped.getOrElse("")}") val stackTrace = Thread.currentThread.getStackTrace - stackTrace.foreach(elem => logError(elem.toString)) + stackTrace.foreach(elem => log.error(elem.toString)) } def push(elem: NvtxId): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala index ada85d211e8..d05ecaa576b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,11 +76,11 @@ object NvtxIdWithMetrics { } } -class MetricRange(val metrics: Seq[GpuMetric], val excludeMetric: Seq[GpuMetric] = Seq.empty) +class MetricRange(val metrics: Seq[GpuMetric], val excludeMetric: Seq[GpuMetric]) extends AutoCloseable { // add a convenient constructor - def this(metrics: GpuMetric*) = this(metrics.toSeq) + def this(metrics: GpuMetric*) = this(metrics.toSeq, Seq.empty) val needTracks = metrics.map(_.tryActivateTimer(excludeMetric)) private val start = System.nanoTime() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala index 393d18569c6..f6c61f2e2e2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,11 +33,11 @@ class PrioritySemaphore[T](val maxPermits: Long, val maxConcurrentGpuTasksLimit: private var occupiedSlots: Long = 0 private var currentConcurrentGpuTasksNum: Long = 0 - private case class ThreadInfo(priority: T, - condition: Condition, - computeNumPermits: () => Long, - wasOnGpuBefore: () => Boolean, - taskId: Long) { + private class ThreadInfo(val priority: T, + val condition: Condition, + val computeNumPermits: () => Long, + val wasOnGpuBefore: () => Boolean, + val taskId: Long) { var signaled: Boolean = false var permitsUsed: Long = 0 } @@ -60,7 +60,7 @@ class PrioritySemaphore[T](val maxPermits: Long, val maxConcurrentGpuTasksLimit: if (waitingQueue.size() > 0 && priorityComp.compare( waitingQueue.peek(), - ThreadInfo(priority, null, () => numPermits, wasOnGpuBefore, taskAttemptId) + new ThreadInfo(priority, null, () => numPermits, wasOnGpuBefore, taskAttemptId) ) < 0) { false } else if (!canAcquire(numPermits)) { @@ -81,7 +81,8 @@ class PrioritySemaphore[T](val maxPermits: Long, val maxConcurrentGpuTasksLimit: val numPermitsNow = computePermits() if (!tryAcquire(numPermitsNow, priority, wasOnGpuBefore, taskAttemptId)) { val condition = lock.newCondition() - val info = ThreadInfo(priority, condition, computePermits, wasOnGpuBefore, taskAttemptId) + val info = new ThreadInfo( + priority, condition, computePermits, wasOnGpuBefore, taskAttemptId) try { waitingQueue.add(info) // only count tasks that had held semaphore before, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/asyncProfiler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/asyncProfiler.scala index 1cd6f625909..75bdab0b7e1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/asyncProfiler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/asyncProfiler.scala @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.TaskContext import org.apache.spark.api.plugin.PluginContext -import org.apache.spark.internal.Logging import org.apache.spark.util.SerializableConfiguration /** @@ -55,7 +54,9 @@ import org.apache.spark.util.SerializableConfiguration * */ -object AsyncProfilerOnExecutor extends Logging { +object AsyncProfilerOnExecutor { + + private val log = org.slf4j.LoggerFactory.getLogger(AsyncProfilerOnExecutor.getClass) private var asyncProfilerPrefix: Option[String] = None private var asyncProfiler: Option[AsyncProfiler] = None @@ -347,7 +348,7 @@ object AsyncProfilerOnExecutor extends Logging { val outPath = new Path(asyncProfilerPrefix.get, if (jfrCompressionEnabled) baseFileName + ".gz" else baseFileName) - val hadoopConf = pluginCtx.ask(ProfileInitMsg(executorId, outPath.toString)) + val hadoopConf = pluginCtx.ask(new ProfileInitMsg(executorId, outPath.toString)) .asInstanceOf[SerializableConfiguration].value val fs = outPath.getFileSystem(hadoopConf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala index ae23430a90b..7364ef7ce2a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala @@ -28,6 +28,7 @@ import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.{Add, And, ArrayAggregate, Attribute, AttributeReference, AttributeSeq, CaseWhen, Cast, Expression, ExprId, Greatest, If, LambdaFunction, Least, Literal, Multiply, NamedExpression, NamedLambdaVariable, Or} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.GpuMapDedupPolicy import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, Metadata, NumericType, ShortType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -527,8 +528,7 @@ case class GpuTransformKeys( override def prettyName: String = "transform_keys" // Spark 4.1+ returns an enum value instead of String, so use toString first - private def exceptionOnDupKeys = - SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY).toString.toUpperCase == "EXCEPTION" + private def exceptionOnDupKeys = GpuMapDedupPolicy.isException override lazy val hasSideEffects: Boolean = function.nullable || exceptionOnDupKeys || super.hasSideEffects @@ -1140,13 +1140,13 @@ case object AnyOp extends AggOp { * @param accVarExprId the accumulator NamedLambdaVariable's exprId * @param elemVar the element NamedLambdaVariable (used to build the g lambda) */ -case class ArrayAggregateDecomposition( - op: AggOp, - g: Expression, - accVarExprId: ExprId, - elemVar: NamedLambdaVariable) +class ArrayAggregateDecomposition( + val op: AggOp, + val g: Expression, + val accVarExprId: ExprId, + val elemVar: NamedLambdaVariable) extends Serializable -private case class ExtractedG(g: Expression, hasBareAccBranch: Boolean) +private class ExtractedG(val g: Expression, val hasBareAccBranch: Boolean) extends Serializable /** @@ -1223,7 +1223,7 @@ object ArrayAggregateDecomposer { "that no-contribution branch into an identity value") } - Right(ArrayAggregateDecomposition(op, g, accId, elemVar)) + Right(new ArrayAggregateDecomposition(op, g, accId, elemVar)) } /** @@ -1246,8 +1246,8 @@ object ArrayAggregateDecomposer { accId: ExprId, op: AggOp): Option[ExtractedG] = { op.matchBinary(unwrapDecimalPatternWrappers(e)).flatMap { case (l, r) => - if (isAccRef(l, accId) && !containsAccRef(r, accId)) Some(ExtractedG(r, false)) - else if (isAccRef(r, accId) && !containsAccRef(l, accId)) Some(ExtractedG(l, false)) + if (isAccRef(l, accId) && !containsAccRef(r, accId)) Some(new ExtractedG(r, false)) + else if (isAccRef(r, accId) && !containsAccRef(l, accId)) Some(new ExtractedG(l, false)) else None } } @@ -1264,7 +1264,7 @@ object ArrayAggregateDecomposer { op: AggOp, accType: DataType): Option[ExtractedG] = { if (isAccRef(branch, accId)) { - Some(ExtractedG(op.identityLiteral(accType), true)) + Some(new ExtractedG(op.identityLiteral(accType), true)) } else { extractG(branch, accId, op, accType) } @@ -1279,7 +1279,7 @@ object ArrayAggregateDecomposer { for { tG <- extractBranch(t, accId, op, accType) fG <- extractBranch(f, accId, op, accType) - } yield ExtractedG(If(cond, tG.g, fG.g), tG.hasBareAccBranch || fG.hasBareAccBranch) + } yield new ExtractedG(If(cond, tG.g, fG.g), tG.hasBareAccBranch || fG.hasBareAccBranch) case CaseWhen(branches, Some(elseValue)) if branches.forall { case (c, _) => !containsAccRef(c, accId) } => @@ -1292,7 +1292,7 @@ object ArrayAggregateDecomposer { val gBranches = branchDecs.map { case (c, dec) => (c, dec.get.g) } val hasBareAccBranch = branchDecs.exists(_._2.exists(_.hasBareAccBranch)) || elseDec.exists(_.hasBareAccBranch) - Some(ExtractedG(CaseWhen(gBranches, Some(elseDec.get.g)), hasBareAccBranch)) + Some(new ExtractedG(CaseWhen(gBranches, Some(elseDec.get.g)), hasBareAccBranch)) } case _ => None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala index dd8e4f3adda..2ffb973cd63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -453,7 +453,7 @@ case class GpuTopN( } } -case class GpuTakeOrderedAndProjectExecMeta( +class GpuTakeOrderedAndProjectExecMeta( takeExec: TakeOrderedAndProjectExec, rapidsConf: RapidsConf, parentOpt: Option[RapidsMeta[_, _, _]], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index e2d03816c77..a51e278930a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -30,28 +30,26 @@ import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector, Scalar} import ai.rapids.cudf.ast import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray -import com.nvidia.spark.rapids.shims.{GpuTypeShims, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{GpuLiteralShim, GpuTypeShims, SparkShimImpl} import org.apache.commons.codec.binary.{Hex => ApacheHex} -import org.json4s.JsonAST.{JField, JNull, JString, JValue} -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData, TimestampFormatter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.UTF8String -object GpuScalar extends Logging { +object GpuScalar { + private[this] val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) // TODO Support interpreting the value to a Spark DataType def extract(v: Scalar): Any = { if (v != null && v.isValid) { - logDebug(s"Extracting data from the Scalar $v.") + log.debug(s"Extracting data from the Scalar $v.") v.getType match { case DType.BOOL8 => v.getBoolean case DType.FLOAT32 => v.getFloat @@ -644,7 +642,7 @@ object GpuLiteral { /** * In order to do type conversion and checking, use GpuLiteral.create() instead of constructor. */ -case class GpuLiteral (value: Any, dataType: DataType) extends GpuLeafExpression { +case class GpuLiteral (value: Any, dataType: DataType) extends GpuLiteralShim { // Assume this came from Spark Literal and no need to call Literal.validateLiteralValue here. @@ -677,19 +675,6 @@ case class GpuLiteral (value: Any, dataType: DataType) extends GpuLeafExpression case _ => false } - override protected def jsonFields: List[JField] = { - // Turns all kinds of literal values to string in json field, as the type info is hard to - // retain in json format, e.g. {"a": 123} can be an int, or double, or decimal, etc. - val jsonValue = (value, dataType) match { - case (null, _) => JNull - case (i: Int, DateType) => JString(DateTimeUtils.toJavaDate(i).toString) - case (l: Long, TimestampType) => JString(DateTimeUtils.toJavaTimestamp(l).toString) - case (other, _) => JString(other.toString) - } - ("value" -> jsonValue) :: - ("dataType" -> TrampolineUtil.jsonValue(dataType).asInstanceOf[JValue]) :: Nil - } - override def sql: String = (value, dataType) match { case (_, NullType | _: ArrayType | _: MapType | _: StructType) if value == null => "NULL" case _ if value == null => s"CAST(NULL AS ${dataType.sql})" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index fb9e042900c..3aa4d586021 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -42,9 +41,14 @@ import org.apache.spark.sql.rapids.shims.SparkSessionUtils import org.apache.spark.sql.types.DataType import org.apache.spark.util.SerializableConfiguration -case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int], attrs: Seq[Attribute]) +class LoreRDDMeta( + val numPartitions: Int, + val outputPartitions: Seq[Int], + val attrs: Seq[Attribute]) extends Serializable -case class LoreRDDPartitionMeta(numBatches: Int, dataType: Seq[DataType]) +class LoreRDDPartitionMeta( + val numBatches: Int, + val dataType: Seq[DataType]) extends Serializable trait GpuLoreRDD { def rootPath: Path @@ -64,7 +68,17 @@ trait GpuLoreRDD { } } -object GpuLore extends Logging { +object GpuLore { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + + private def logWarning(msg: => String, throwable: Throwable): Unit = { + log.warn(msg, throwable) + } + /** * Lore id of a plan node. */ @@ -262,12 +276,13 @@ object GpuLore extends Logging { checkUnsupportedOperator(g) val currentExecRootPath = new Path(loreOutputRootPath, s"loreId-$loreId") registerTag(g, LORE_DUMP_PATH_TAG, currentExecRootPath.toString, tagRollbacks) - val loreOutputInfo = LoreOutputInfo(outputLoreIds, + val loreOutputInfo = new LoreOutputInfo(outputLoreIds, currentExecRootPath.toString) g.children.zipWithIndex.foreach { case (child, idx) => - val dumpRDDInfo = LoreDumpRDDInfo(idx, loreOutputInfo, child.output, hadoopConf, + val dumpRDDInfo = new LoreDumpRDDInfo(idx, loreOutputInfo, child.output, + hadoopConf, useOriginalSchemaNames = rapidsConf.loreParquetUseOriginalNames, nonStrictMode = allowNonStrictMode) child match { @@ -335,7 +350,7 @@ object GpuLore extends Logging { tagRollbacks: mutable.ArrayBuffer[TagRollback], nonStrictMode: Boolean) = { val innerPlan = sub.plan.child if (innerPlan.isInstanceOf[GpuExec]) { - val dumpRDDInfo = LoreDumpRDDInfo(id, loreOutputInfo, innerPlan.output, + val dumpRDDInfo = new LoreDumpRDDInfo(id, loreOutputInfo, innerPlan.output, hadoopConf, useOriginalSchemaNames = RapidsConf.LORE_PARQUET_USE_ORIGINAL_NAMES .get(SparkSessionUtils.sessionFromPlan(innerPlan).sessionState.conf), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala index 28fa0b2dbbf..b7f4fe37c94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ case class OutputLoreId(loreId: LoreId, partitionIds: Set[Int]) { partitionIds.contains(partitionId) } -case class LoreOutputInfo(outputLoreId: OutputLoreId, pathStr: String) { +class LoreOutputInfo(val outputLoreId: OutputLoreId, val pathStr: String) extends Serializable { def path: Path = new Path(pathStr) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index 4c446cdd0cf..9ebdae964e8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -37,16 +36,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration -case class LoreDumpRDDInfo( - idxInParent: Int, - loreOutputInfo: LoreOutputInfo, - attrs: Seq[Attribute], - hadoopConf: Broadcast[SerializableConfiguration], - useOriginalSchemaNames: Boolean = false, - nonStrictMode: Boolean = false) +class LoreDumpRDDInfo( + val idxInParent: Int, + val loreOutputInfo: LoreOutputInfo, + val attrs: Seq[Attribute], + val hadoopConf: Broadcast[SerializableConfiguration], + val useOriginalSchemaNames: Boolean, + val nonStrictMode: Boolean) extends Serializable class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) - extends RDD[ColumnarBatch](input) with GpuLoreRDD with Logging { + extends RDD[ColumnarBatch](input) with GpuLoreRDD { override def rootPath: Path = pathOfChild(info.loreOutputInfo.path, info.idxInParent) private val factDataTypes = info.attrs.map(_.dataType) lazy val kudoSerializer: KudoSerializer = new KudoSerializer( @@ -54,7 +53,8 @@ class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) def saveMeta(): Unit = { try { - val meta = LoreRDDMeta(input.getNumPartitions, this.getPartitions.map(_.index), info.attrs) + val meta = new LoreRDDMeta(input.getNumPartitions, this.getPartitions.map(_.index), + info.attrs) GpuLore.dumpObject(meta, pathOfMeta, this.context.hadoopConfiguration) } catch { case NonFatal(e) if (info.nonStrictMode) => @@ -87,9 +87,9 @@ class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) .isInstanceOf[KudoSerializedTableColumn]) val partitionMeta = if (isFromShuffle) { // get the array of dataType from the info.attrs - LoreRDDPartitionMeta(batchIdx, factDataTypes) + new LoreRDDPartitionMeta(batchIdx, factDataTypes) } else { - LoreRDDPartitionMeta(batchIdx, GpuColumnVector.extractTypes(ret)) + new LoreRDDPartitionMeta(batchIdx, GpuColumnVector.extractTypes(ret)) } GpuLore.dumpObject(partitionMeta, pathOfPartitionMeta(split.index), info.hadoopConf.value.value) @@ -161,7 +161,7 @@ class SimpleRDD(_sc: SparkContext, data: Broadcast[Any], schema: StructType) ext case class GpuLoreDumpExec( child: GpuExec, loreDumpInfo: LoreDumpRDDInfo) - extends ShimUnaryExecNode with GpuExec with Logging { + extends ShimUnaryExecNode with GpuExec { override def output: Seq[Attribute] = child.output diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala index 27fad8ef5e8..68dca621ffb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,8 +94,8 @@ class GpuLoreReplayRDD(sc: SparkContext, rootPathStr: String, } override protected def getPartitions: Array[Partition] = { - (0 until loreRDDMeta.numPartitions).map(LoreReplayPartition).toArray + (0 until loreRDDMeta.numPartitions).map(new LoreReplayPartition(_)).toArray } } -case class LoreReplayPartition(override val index: Int) extends Partition +class LoreReplayPartition(override val index: Int) extends Partition diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/namedExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/namedExpressions.scala index 249dae21c20..186d4f26c47 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/namedExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/namedExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,10 @@ import java.util.Objects import ai.rapids.cudf.ColumnVector import ai.rapids.cudf.ast import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, Generator, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark -import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.{DataType, Metadata} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -87,14 +85,9 @@ case class GpuAlias(child: Expression, name: String)( } override def sql: String = { - if (SparkShimImpl.hasAliasQuoteFix) { - val qualifierPrefix = - if (qualifier.nonEmpty) qualifier.map(quoteIfNeeded).mkString(".") + "." else "" - s"${child.sql} AS $qualifierPrefix${quoteIfNeeded(name)}" - } else { - val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else "" - s"${child.sql} AS $qualifierPrefix${quoteIdentifier(name)}" - } + val qualifierPrefix = + if (qualifier.nonEmpty) qualifier.map(quoteIfNeeded).mkString(".") + "." else "" + s"${child.sql} AS $qualifierPrefix${quoteIfNeeded(name)}" } private def quoteIfNeeded(part: String): String = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala index d9fba601f02..862f798873c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala @@ -21,9 +21,9 @@ import scala.collection.mutable import ai.rapids.cudf.{ast, BinaryOp, ColumnVector, ColumnView, DType, Scalar} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.ShimExpression +import com.nvidia.spark.rapids.shims.{ShimExpression, ShimPredicate} -import org.apache.spark.sql.catalyst.expressions.{ComplexTypeMergingExpression, Expression, Predicate} +import org.apache.spark.sql.catalyst.expressions.{ComplexTypeMergingExpression, Expression} import org.apache.spark.sql.types.{DataType, DoubleType, FloatType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -126,7 +126,7 @@ case class GpuCoalesce(children: Seq[Expression]) extends GpuExpression * UnaryOp */ -case class GpuIsNull(child: Expression) extends GpuUnaryExpression with Predicate { +case class GpuIsNull(child: Expression) extends GpuUnaryExpression with ShimPredicate { override def nullable: Boolean = false override def sql: String = s"(${child.sql} IS NULL)" @@ -140,7 +140,7 @@ case class GpuIsNull(child: Expression) extends GpuUnaryExpression with Predicat } } -case class GpuIsNotNull(child: Expression) extends GpuUnaryExpression with Predicate { +case class GpuIsNotNull(child: Expression) extends GpuUnaryExpression with ShimPredicate { override def nullable: Boolean = false override def sql: String = s"(${child.sql} IS NOT NULL)" @@ -155,7 +155,7 @@ case class GpuIsNotNull(child: Expression) extends GpuUnaryExpression with Predi } } -case class GpuIsNan(child: Expression) extends GpuUnaryExpression with Predicate { +case class GpuIsNan(child: Expression) extends GpuUnaryExpression with ShimPredicate { override def nullable: Boolean = false override def sql: String = s"(${child.sql} IS NAN)" @@ -192,7 +192,7 @@ case class GpuAtLeastNNonNulls( n: Int, exprs: Seq[Expression]) extends GpuExpression with ShimExpression - with Predicate { + with ShimPredicate { override def nullable: Boolean = false override def foldable: Boolean = exprs.forall(_.foldable) override def toString: String = s"GpuAtLeastNNulls(n, ${children.mkString(",")})" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetUtils.scala index cc9791a9038..52ab121978e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetUtils.scala @@ -24,9 +24,9 @@ import com.nvidia.spark.rapids.shims.parquet.GpuParquetUtilsShims import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath} import org.apache.parquet.schema.MessageType -import org.apache.spark.internal.Logging -object GpuParquetUtils extends Logging { +object GpuParquetUtils { + /** * Trim block metadata to contain only the column chunks that occur in the specified schema. * The column chunks that are returned are preserved verbatim diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala index 5ead8947dfb..d2cb5f4c863 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.parquet import java.io.{InputStream, IOException} -import java.nio.ByteBuffer +import java.nio.{Buffer, ByteBuffer} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -95,7 +95,7 @@ private class ByteBufferInputStream(private var buffer: ByteBuffer) override def skip(bytes: Long): Long = { if (buffer != null) { val amountToSkip = math.min(bytes, buffer.remaining).toInt - buffer.position(buffer.position() + amountToSkip) + buffer.asInstanceOf[Buffer].position(buffer.position() + amountToSkip) if (buffer.remaining() == 0) { cleanUp() } @@ -128,7 +128,7 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile { if (newPos > Int.MaxValue || newPos < Int.MinValue) { throw new IllegalStateException("seek value is out of supported range " + newPos) } - byteBuffer.position(newPos.toInt) + byteBuffer.asInstanceOf[Buffer].position(newPos.toInt) } } } @@ -231,7 +231,7 @@ case class ParquetCachedBatch( * Spark wants the producer to close the batch. We have a listener in this iterator that will close * the batch after the task is completed */ -private case class CloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]) extends +private class CloseableColumnBatchIterator(val iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] { var cb: ColumnarBatch = _ @@ -592,7 +592,7 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { new ColumnarBatch(cols.safeMap(_.copyToHost()).toArray, gpuBatch.numRows()) } }) - cbRdd.mapPartitions(iter => CloseableColumnBatchIterator(iter)) + cbRdd.mapPartitions(iter => new CloseableColumnBatchIterator(iter)) } else { val origSelectedAttributesWithUnambiguousNames = sanitizeColumnNames(selectedAttributes, selectedSchemaWithNames) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala index de11a39bcd2..89cd76e5ac1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.api.plugin.PluginContext -import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerStageCompleted} import org.apache.spark.sql.rapids.execution.TrampolineUtil @@ -38,7 +37,21 @@ import org.apache.spark.util.SerializableConfiguration * For profiling with com.nvidia.spark.rapids.jni.Profiler */ -object ProfilerOnExecutor extends Logging { +object ProfilerOnExecutor { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg) + + private def logWarning(msg: => String): Unit = if (log.isWarnEnabled) log.warn(msg) + + private def logWarning(msg: => String, throwable: Throwable): Unit = { + if (log.isWarnEnabled) log.warn(msg, throwable) + } + + private def logError(msg: => String, throwable: Throwable): Unit = { + if (log.isErrorEnabled) log.error(msg, throwable) + } + private val jobPattern = raw"SPARK_.*_JId_([0-9]+).*".r private var writer: Option[ProfileWriter] = None private var timeRanges: Option[Seq[(Long, Long)]] = None @@ -99,7 +112,7 @@ object ProfilerOnExecutor extends Logging { } catch { case l: Exception => logWarning("Unable to launch profiler, we will abort profiling session.",l) - pluginCtx.send(ProfileErrorMsg(executorId, s"error launching profiler: $l")) + pluginCtx.send(new ProfileErrorMsg(executorId, s"error launching profiler: $l")) // failed to initialize, lets close the writer, and try to shutdown. if (profileWriter != null) { Profiler.shutdown() @@ -188,7 +201,7 @@ object ProfilerOnExecutor extends Logging { if (!isProfileActive) { Profiler.start() isProfileActive = true - w.pluginCtx.send(ProfileStatusMsg(w.executorId, "profile started")) + w.pluginCtx.send(new ProfileStatusMsg(w.executorId, "profile started")) } } } @@ -198,7 +211,7 @@ object ProfilerOnExecutor extends Logging { if (isProfileActive) { Profiler.stop() isProfileActive = false - w.pluginCtx.send(ProfileStatusMsg(w.executorId, "profile stopped")) + w.pluginCtx.send(new ProfileStatusMsg(w.executorId, "profile stopped")) } } } @@ -290,7 +303,7 @@ object ProfilerOnExecutor extends Logging { (activeJobs.toArray, (activeStages ++ stageTaskCount.keys).toArray) } val (completedJobs, completedStages, allDone) = - w.pluginCtx.ask(ProfileJobStageQueryMsg(jobs, stages)) + w.pluginCtx.ask(new ProfileJobStageQueryMsg(jobs, stages)) .asInstanceOf[(Array[Int], Array[Int], Boolean)] synchronized { completedJobs.foreach(activeJobs.remove) @@ -314,7 +327,11 @@ object ProfilerOnExecutor extends Logging { class ProfileWriter( val pluginCtx: PluginContext, profilePathPrefix: String, - codec: Option[CompressionCodec]) extends Profiler.DataWriter with Logging { + codec: Option[CompressionCodec]) extends Profiler.DataWriter { + @transient private lazy val log = org.slf4j.LoggerFactory.getLogger(classOf[ProfileWriter]) + + private def logWarning(msg: => String): Unit = if (log.isWarnEnabled) log.warn(msg) + val executorId: String = pluginCtx.executorID() private val outPath = getOutputPath(profilePathPrefix, codec) private val out = openOutput(codec) @@ -333,7 +350,7 @@ class ProfileWriter( isClosed = true out.close() logWarning(s"Profiling completed, output written to $outPath") - pluginCtx.send(ProfileEndMsg(executorId, outPath.toString)) + pluginCtx.send(new ProfileEndMsg(executorId, outPath.toString)) } } @@ -355,7 +372,7 @@ class ProfileWriter( private def openOutput(codec: Option[CompressionCodec]): WritableByteChannel = { logWarning(s"Profiler initialized, output will be written to $outPath") - val hadoopConf = pluginCtx.ask(ProfileInitMsg(executorId, outPath.toString)) + val hadoopConf = pluginCtx.ask(new ProfileInitMsg(executorId, outPath.toString)) .asInstanceOf[SerializableConfiguration].value val fs = outPath.getFileSystem(hadoopConf) val fsStream = fs.create(outPath, false) @@ -364,7 +381,15 @@ class ProfileWriter( } } -object ProfilerOnDriver extends Logging { +object ProfilerOnDriver { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logWarning(msg: => String): Unit = if (log.isWarnEnabled) log.warn(msg) + + private def logDebug(msg: => String): Unit = if (log.isDebugEnabled) log.debug(msg) + + private def logError(msg: => String): Unit = if (log.isErrorEnabled) log.error(msg) + private var hadoopConf: SerializableConfiguration = null private var jobRanges: RangeConfMatcher = null private var numJobsToProfile: Long = 0L @@ -404,29 +429,35 @@ object ProfilerOnDriver extends Logging { } def handleMsg(m: ProfileMsg): AnyRef = m match { - case ProfileInitMsg(executorId, path) => + case msg: ProfileInitMsg => + val executorId = msg.executorId + val path = msg.path logWarning(s"Profiling: Executor $executorId initialized profiler, writing to $path") if (hadoopConf == null) { throw new IllegalStateException("Hadoop configuration not set") } hadoopConf - case ProfileErrorMsg(executorId, msg) => + case msg: ProfileErrorMsg => + val executorId = msg.executorId + val errorMsg = msg.msg if (profilerErrored) { - logDebug(s"Profiling: Error starting profiler from $executorId: $msg") + logDebug(s"Profiling: Error starting profiler from $executorId: $errorMsg") } else { - logError(s"Profiling: Error starting profiler from $executorId: $msg. Suppressing others.") + logError(s"Profiling: Error starting profiler from $executorId: $errorMsg. " + + "Suppressing others.") } profilerErrored = true null - case ProfileStatusMsg(executorId, msg) => - logWarning(s"Profiling: Executor $executorId: $msg") + case msg: ProfileStatusMsg => + logWarning(s"Profiling: Executor ${msg.executorId}: ${msg.msg}") null - case ProfileJobStageQueryMsg(activeJobs, activeStages) => - val filteredJobs = activeJobs.filter(j => completedJobs.containsKey(j)) - val filteredStages = activeStages.filter(s => completedStages.containsKey(s)) + case msg: ProfileJobStageQueryMsg => + val filteredJobs = msg.activeJobs.filter(j => completedJobs.containsKey(j)) + val filteredStages = msg.activeStages.filter(s => completedStages.containsKey(s)) (filteredJobs, filteredStages, isJobsStageProfilingComplete) - case ProfileEndMsg(executorId, path) => - logWarning(s"Profiling: Executor $executorId ended profiling, profile written to $path") + case msg: ProfileEndMsg => + logWarning(s"Profiling: Executor ${msg.executorId} ended profiling, " + + s"profile written to ${msg.path}") null case _ => throw new IllegalStateException(s"Unexpected profile msg: $m") @@ -453,17 +484,7 @@ object ProfilerOnDriver extends Logging { } } -trait ProfileMsg - -case class ProfileInitMsg(executorId: String, path: String) extends ProfileMsg -case class ProfileStatusMsg(executorId: String, msg: String) extends ProfileMsg -case class ProfileErrorMsg(executorId: String, msg: String) extends ProfileMsg -case class ProfileEndMsg(executorId: String, path: String) extends ProfileMsg - -// Reply is a tuple of: +// Reply to ProfileJobStageQueryMsg is a tuple of: // - array of jobs that have completed // - array of stages that have completed // - boolean if there are no further jobs/stages to profile -case class ProfileJobStageQueryMsg( - activeJobs: Array[Int], - activeStages: Array[Int]) extends ProfileMsg diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/sparkRapidsListeners.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/sparkRapidsListeners.scala index cc95afa51fa..3742e9528ae 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/sparkRapidsListeners.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/sparkRapidsListeners.scala @@ -18,12 +18,12 @@ package com.nvidia.spark.rapids import org.apache.spark.scheduler.SparkListenerEvent -case class SparkRapidsBuildInfoEvent( - sparkRapidsBuildInfo: Map[String, String], - sparkRapidsJniBuildInfo: Map[String, String], - cudfBuildInfo: Map[String, String], - sparkRapidsPrivateBuildInfo: Map[String, String] -) extends SparkListenerEvent +class SparkRapidsBuildInfoEvent( + val sparkRapidsBuildInfo: Map[String, String], + val sparkRapidsJniBuildInfo: Map[String, String], + val cudfBuildInfo: Map[String, String], + val sparkRapidsPrivateBuildInfo: Map[String, String] +) extends SparkListenerEvent with Serializable /** * Event posted when a shuffle is unregistered, containing disk I/O savings statistics. @@ -37,11 +37,11 @@ case class SparkRapidsBuildInfoEvent( * @param numSpills Number of buffers that were spilled to disk * @param numForcedFileOnly Number of buffers that used forced file-only mode */ -case class SparkRapidsShuffleDiskSavingsEvent( - shuffleId: Int, - bytesFromMemory: Long, - bytesFromDisk: Long, - numExpansions: Int = 0, - numSpills: Int = 0, - numForcedFileOnly: Int = 0 -) extends SparkListenerEvent +class SparkRapidsShuffleDiskSavingsEvent( + val shuffleId: Int, + val bytesFromMemory: Long, + val bytesFromDisk: Long, + val numExpansions: Int, + val numSpills: Int, + val numForcedFileOnly: Int +) extends SparkListenerEvent with Serializable diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/BasicWindowCalc.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/BasicWindowCalc.scala index 7ece2df3ef6..8ce4eaa37da 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/BasicWindowCalc.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/BasicWindowCalc.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,15 +39,27 @@ import org.apache.spark.unsafe.types.CalendarInterval * groups those two together so we can have a complete picture of how to perform these types of * aggregations. */ -case class AggAndReplace[T](agg: T, nullReplacePolicy: Option[ReplacePolicy]) +class AggAndReplace[T](val agg: T, val nullReplacePolicy: Option[ReplacePolicy]) /** * The class represents a window function and the locations of its deduped inputs after an initial * projection. */ -case class BoundGpuWindowFunction( - windowFunc: GpuWindowFunction, - boundInputLocations: Array[Int]) { +class BoundGpuWindowFunction( + val windowFunc: GpuWindowFunction, + val boundInputLocations: Array[Int]) { + + override def equals(other: Any): Boolean = other match { + case that: BoundGpuWindowFunction => + windowFunc == that.windowFunc && boundInputLocations.eq(that.boundInputLocations) + case _ => false + } + + override def hashCode(): Int = { + var result = windowFunc.## + result = 31 * result + System.identityHashCode(boundInputLocations) + result + } /** * Get the operations to perform a scan aggregation. @@ -141,7 +153,7 @@ object RangeBoundaryValue { def double(value: Double): DoubleRangeBoundaryValue = DoubleRangeBoundaryValue(value) } -case class ParsedBoundary(isUnbounded: Boolean, value: RangeBoundaryValue) +class ParsedBoundary(val isUnbounded: Boolean, val value: RangeBoundaryValue) extends Serializable object GroupedAggregations { /** @@ -295,7 +307,7 @@ object GroupedAggregations { private def getRangeBoundaryValue(boundary: Expression, orderByType: DType): ParsedBoundary = boundary match { case special: GpuSpecialFrameBoundary => - ParsedBoundary( + new ParsedBoundary( isUnbounded = special.isUnbounded, value = orderByType.getTypeId match { case DType.DTypeEnum.DECIMAL128 => RangeBoundaryValue.bigInt(special.value) @@ -308,38 +320,38 @@ object GroupedAggregations { // Get the total microseconds for TIMESTAMP_MICROSECONDS var x = TimeUnit.DAYS.toMicros(ci.days) + ci.microseconds if (x == Long.MinValue) x = Long.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case GpuLiteral(value, ByteType) => var x = value.asInstanceOf[Byte] if (x == Byte.MinValue) x = Byte.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case GpuLiteral(value, ShortType) => var x = value.asInstanceOf[Short] if (x == Short.MinValue) x = Short.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case GpuLiteral(value, IntegerType) => var x = value.asInstanceOf[Int] if (x == Int.MinValue) x = Int.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case GpuLiteral(value, LongType) => var x = value.asInstanceOf[Long] if (x == Long.MinValue) x = Long.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case GpuLiteral(value, FloatType) => var x = value.asInstanceOf[Float] if (x == Float.MinValue) x = Float.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.double(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.double(Math.abs(x))) case GpuLiteral(value, DoubleType) => var x = value.asInstanceOf[Double] if (x == Double.MinValue) x = Double.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.double(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.double(Math.abs(x))) case GpuLiteral(value: Decimal, DecimalType()) => orderByType.getTypeId match { case DType.DTypeEnum.DECIMAL32 | DType.DTypeEnum.DECIMAL64 => - ParsedBoundary(isUnbounded = false, + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(value.toUnscaledLong))) case DType.DTypeEnum.DECIMAL128 => - ParsedBoundary(isUnbounded = false, + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.bigInt(value.toJavaBigDecimal.unscaledValue().abs)) case anythingElse => throw new UnsupportedOperationException(s"Unexpected Decimal type: $anythingElse") @@ -378,7 +390,7 @@ class GroupedAggregations { data.getOrElseUpdate(win.normalizedFrameSpec, mutable.HashMap.empty) } - forSpec.getOrElseUpdate(BoundGpuWindowFunction(win.wrappedWindowFunc, inputLocs), + forSpec.getOrElseUpdate(new BoundGpuWindowFunction(win.wrappedWindowFunc, inputLocs), ArrayBuffer.empty) += outputIndex } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuBatchedBoundedWindowExec.scala index 49e6d5b23be..dbc2d99c054 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuBatchedBoundedWindowExec.scala @@ -24,7 +24,6 @@ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan @@ -41,7 +40,7 @@ class GpuBatchedBoundedWindowIterator( maxFollowing: Int, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, - opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc with Logging { + opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc with RapidsLocalLog { override def isRunningBatched: Boolean = false // Not "Running Window" optimized. // This is strictly for batching. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index d244550b878..30ffc771d70 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,8 +125,8 @@ object TableAndBatchUtils { // the ride-along columns and the aggregation result should both be sorted by the partition by // columns. Also the aggregation result must have a count column so it can be expanded using // repeat to get back to the size of the ride-along columns. -case class FirstPassAggResult(rideAlongColumns: SpillableColumnarBatch, - aggResult: SpillableColumnarBatch) extends AutoCloseable { +class FirstPassAggResult(val rideAlongColumns: SpillableColumnarBatch, + val aggResult: SpillableColumnarBatch) extends AutoCloseable { override def close(): Unit = { rideAlongColumns.close() aggResult.close() @@ -219,7 +219,7 @@ class GpuUnboundedToUnboundedAggWindowFirstPassIterator( val rideAlongColumns = GpuProjectExec.project(preProcessedInput, boundStages.boundRideAlong) - FirstPassAggResult( + new FirstPassAggResult( adoptAndMakeSpillable(rideAlongColumns), toSpillableBatch(aggResultTable, boundStages.groupingColumnTypes ++ boundStages.aggResultTypes)) @@ -333,8 +333,8 @@ class PartitionedFirstPassAggResult(firstPassAggResult: FirstPassAggResult, // an iterator of ride-along columns, and the full agg results for those columns. It is not // the responsibility of the second stage to try and combine small batches or split up large // ones, beyond what the retry framework might do. -case class SecondPassAggResult(rideAlongColumns: util.LinkedList[SpillableColumnarBatch], - aggResult: SpillableColumnarBatch) extends AutoCloseable { +class SecondPassAggResult(val rideAlongColumns: util.LinkedList[SpillableColumnarBatch], + val aggResult: SpillableColumnarBatch) extends AutoCloseable { override def close(): Unit = { rideAlongColumns.forEach(_.close()) rideAlongColumns.clear() @@ -467,7 +467,7 @@ class GpuUnboundedToUnboundedAggWindowSecondPassIterator( .asInstanceOf[util.LinkedList[SpillableColumnarBatch]] completedRideAlongBatches.add(partitioned.otherGroupRideAlong.get) val groupsRemoved = removeGroupColumns(mergedAggResults) - SecondPassAggResult(completedRideAlongBatches, + new SecondPassAggResult(completedRideAlongBatches, groupsRemoved) } } @@ -503,7 +503,7 @@ class GpuUnboundedToUnboundedAggWindowSecondPassIterator( boundStages.groupingColumnTypes ++ boundStages.aggResultTypes)) { concatAggResults => withResource(groupByMerge(concatAggResults)) { mergedAggResults => - Some(SecondPassAggResult(rideAlongColumnsPendingCompletion, + Some(new SecondPassAggResult(rideAlongColumnsPendingCompletion, removeGroupColumns(mergedAggResults))) } } @@ -521,8 +521,8 @@ class GpuUnboundedToUnboundedAggWindowSecondPassIterator( // The next to final step is to take the original input data along with the agg data, estimate how // to split/combine the input batches to output batches that are close to the target batch size. -case class SlicedBySize(rideAlongColumns: SpillableColumnarBatch, - aggResults: SpillableColumnarBatch) extends AutoCloseable { +class SlicedBySize(val rideAlongColumns: SpillableColumnarBatch, + val aggResults: SpillableColumnarBatch) extends AutoCloseable { override def close(): Unit = { rideAlongColumns.close() aggResults.close() @@ -827,9 +827,9 @@ class PendingSecondAggResults private( if (rideAlongColumns.isEmpty) { // This is the last batch so we don't need to even figure out where to slice // the AggResult - SlicedBySize(rideAlongScb, aggResult.incRefCount()) + new SlicedBySize(rideAlongScb, aggResult.incRefCount()) } else { - SlicedBySize(rideAlongScb, getSlicedAggResultByRepeatedRows(rideAlongScb.numRows())) + new SlicedBySize(rideAlongScb, getSlicedAggResultByRepeatedRows(rideAlongScb.numRows())) } } } @@ -943,12 +943,12 @@ class GpuUnboundedToUnboundedAggFinalIterator( * @param boundAggregations aggregations to be done. NOTE THIS IS WIP * @param boundFinalProject the final project to get the output in the right order */ -case class GpuUnboundedToUnboundedAggStages( - inputTypes: Seq[DataType], - boundPartitionSpec: Seq[GpuExpression], - boundRideAlong: Seq[GpuExpression], - boundAggregations: Seq[GpuExpression], - boundFinalProject: Seq[GpuExpression]) extends Serializable { +class GpuUnboundedToUnboundedAggStages( + val inputTypes: Seq[DataType], + val boundPartitionSpec: Seq[GpuExpression], + val boundRideAlong: Seq[GpuExpression], + val boundAggregations: Seq[GpuExpression], + val boundFinalProject: Seq[GpuExpression]) extends Serializable { val groupingColumnTypes: Seq[DataType] = boundPartitionSpec.map{_.dataType} val groupColumnOrdinals: Seq[Int] = boundPartitionSpec.map { @@ -1108,7 +1108,7 @@ object GpuUnboundedToUnboundedAggWindowIterator { val finalProject = computeFinalProject(rideAlongOutput, aggsToRepeatOutput, windowOps, metrics) - GpuUnboundedToUnboundedAggStages(childTypes, boundPartitionSpec, boundRideAlong, + new GpuUnboundedToUnboundedAggStages(childTypes, boundPartitionSpec, boundRideAlong, boundAggregations, finalProject) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala index fdb786d98f9..792ca5bff94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala @@ -221,11 +221,12 @@ class GpuWindowExecMeta(windowExec: WindowExec, override def getResultColumnsOnly: Boolean = resultColumnsOnly } -case class BatchedOps(running: Seq[NamedExpression], - unboundedAgg: Seq[NamedExpression], - unboundedDoublePass: Seq[NamedExpression], - bounded: Seq[NamedExpression], - passThrough: Seq[NamedExpression]) { +class BatchedOps( + val running: Seq[NamedExpression], + val unboundedAgg: Seq[NamedExpression], + val unboundedDoublePass: Seq[NamedExpression], + val bounded: Seq[NamedExpression], + val passThrough: Seq[NamedExpression]) { private def dedupeByExprId[T <: NamedExpression](exprs: Seq[T]): Seq[T] = { val seen = mutable.HashSet.empty[ExprId] @@ -614,7 +615,7 @@ object GpuWindowExecMeta { throw new IllegalArgumentException( s"Found unexpected expression $other in window exec ${other.getClass}") } - BatchedOps(running.toSeq, + new BatchedOps(running.toSeq, unboundedToUnboundedAgg.toSeq, doublePass.toSeq, batchedBounded.toSeq, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala index 2ea997a222d..280f8e9e2a8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala @@ -28,7 +28,6 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimExpression} import scala.util.{Left, Right} -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions._ @@ -1088,7 +1087,7 @@ class BatchedUnboundedToUnboundedBinaryFixer(val binOp: BinaryOp, val dataType: * right thing when they see a null. */ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) - extends BatchedRunningWindowFixer with Logging { + extends BatchedRunningWindowFixer with RapidsLocalLog { private var previousResult: Option[Scalar] = None // checkpoint @@ -1157,7 +1156,7 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) * @param ignoreNulls Whether the function needs to ignore NULL values in the calculation. */ abstract class FirstLastRunningWindowFixerBase(val name: String, val ignoreNulls: Boolean = false) - extends BatchedRunningWindowFixer with Logging { + extends BatchedRunningWindowFixer with RapidsLocalLog { /** * Saved "carry-over" result that might be applied to the next batch. @@ -1365,7 +1364,7 @@ class LastRunningWindowFixer(ignoreNulls: Boolean = false) * might be able to make this more generic but we need to see what the use case really is. */ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) - extends BatchedRunningWindowFixer with Logging { + extends BatchedRunningWindowFixer with RapidsLocalLog { private val name = "sum" private var previousResult: Option[Scalar] = None private var previousOverflow: Option[Scalar] = None @@ -1642,7 +1641,7 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) * happens in the `scanCombine` method for GpuRank. It is a little ugly but it works to maintain * the requirement that the input to the fixer is a single column. */ -class RankFixer extends BatchedRunningWindowFixer with Logging { +class RankFixer extends BatchedRunningWindowFixer with RapidsLocalLog { import RankFixer._ // We have to look at row number as well as rank. This fixer is the same one that `GpuRowNumber` @@ -1769,7 +1768,7 @@ object RankFixer { * If anything is outside of a continues partition by group then we just keep * those values unchanged. */ -class DenseRankFixer extends BatchedRunningWindowFixer with Logging { +class DenseRankFixer extends BatchedRunningWindowFixer with RapidsLocalLog { import DenseRankFixer._ private var previousRank: Option[Scalar] = None @@ -1942,11 +1941,11 @@ case class GpuRank(children: Seq[Expression]) extends GpuRunningWindowFunction isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = { if (isRunningBatched) { // We are computing both rank and row number so we can fix it up at the end - Seq(AggAndReplace(GroupByScanAggregation.rank(), None), - AggAndReplace(GroupByScanAggregation.sum(), None)) + Seq(new AggAndReplace(GroupByScanAggregation.rank(), None), + new AggAndReplace(GroupByScanAggregation.sum(), None)) } else { // Not batched just do the rank - Seq(AggAndReplace(GroupByScanAggregation.rank(), None)) + Seq(new AggAndReplace(GroupByScanAggregation.rank(), None)) } } @@ -1955,10 +1954,12 @@ case class GpuRank(children: Seq[Expression]) extends GpuRunningWindowFunction override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = { if (isRunningBatched) { // We are computing both rank and row number so we can fix it up at the end - Seq(AggAndReplace(ScanAggregation.rank(), None), AggAndReplace(ScanAggregation.sum(), None)) + Seq( + new AggAndReplace(ScanAggregation.rank(), None), + new AggAndReplace(ScanAggregation.sum(), None)) } else { // Not batched just do the rank - Seq(AggAndReplace(ScanAggregation.rank(), None)) + Seq(new AggAndReplace(ScanAggregation.rank(), None)) } } @@ -2007,13 +2008,13 @@ case class GpuDenseRank(children: Seq[Expression]) extends GpuRunningWindowFunct override def groupByScanAggregation( isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = - Seq(AggAndReplace(GroupByScanAggregation.denseRank(), None)) + Seq(new AggAndReplace(GroupByScanAggregation.denseRank(), None)) override def scanInputProjection(isRunningBatched: Boolean): Seq[Expression] = groupByScanInputProjection(isRunningBatched) override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = - Seq(AggAndReplace(ScanAggregation.denseRank(), None)) + Seq(new AggAndReplace(ScanAggregation.denseRank(), None)) override def newFixer(): BatchedRunningWindowFixer = new DenseRankFixer() } @@ -2040,14 +2041,14 @@ case object GpuRowNumber extends GpuRunningWindowFunction override def groupByScanAggregation( isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = - Seq(AggAndReplace(GroupByScanAggregation.sum(), None)) + Seq(new AggAndReplace(GroupByScanAggregation.sum(), None)) // For regular scans cudf does not support ROW_NUMBER, nor does it support COUNT_ALL // so we will do a SUM on a column of 1s override def scanInputProjection(isRunningBatched: Boolean): Seq[Expression] = groupByScanInputProjection(isRunningBatched) override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = - Seq(AggAndReplace(ScanAggregation.sum(), None)) + Seq(new AggAndReplace(ScanAggregation.sum(), None)) override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { cols.head.castTo(DType.INT32) diff --git a/tools/generated_files/411/operatorsScore.csv b/tools/generated_files/411/operatorsScore.csv index 9bcbd035158..87b8a403879 100644 --- a/tools/generated_files/411/operatorsScore.csv +++ b/tools/generated_files/411/operatorsScore.csv @@ -302,6 +302,7 @@ Subtract,4 Sum,4 Tan,4 Tanh,4 +TimestampAddInterval,4 ToDegrees,4 ToRadians,4 ToUTCTimestamp,4 diff --git a/tools/generated_files/411/supportedExprs.csv b/tools/generated_files/411/supportedExprs.csv index b14305229f8..a710e653b43 100644 --- a/tools/generated_files/411/supportedExprs.csv +++ b/tools/generated_files/411/supportedExprs.csv @@ -659,6 +659,9 @@ Tanh,S,`tanh`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA, Tanh,S,`tanh`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Tanh,S,`tanh`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Tanh,S,`tanh`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TimestampAddInterval,S, ,None,project,start,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +TimestampAddInterval,S, ,None,project,interval,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,S,NA +TimestampAddInterval,S, ,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToDegrees,S,`degrees`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToDegrees,S,`degrees`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToRadians,S,`radians`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA