diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/FunctionsImpl.scala b/sql-plugin/src/main/scala/com/nvidia/spark/FunctionsImpl.scala index 7c27cb79054..54b47384466 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/FunctionsImpl.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/FunctionsImpl.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function0[Column]): UserDefinedFunction = - sp_udf(DFUDF0(f), LongType) + sp_udf(new DFUDF0(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -43,7 +43,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function1[Column, Column]): UserDefinedFunction = - sp_udf(DFUDF1(f), LongType) + sp_udf(new DFUDF1(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -51,7 +51,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function2[Column, Column, Column]): UserDefinedFunction = - sp_udf(DFUDF2(f), LongType) + sp_udf(new DFUDF2(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -59,7 +59,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function3[Column, Column, Column, Column]): UserDefinedFunction = - sp_udf(DFUDF3(f), LongType) + sp_udf(new DFUDF3(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -67,7 +67,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function4[Column, Column, Column, Column, Column]): UserDefinedFunction = - sp_udf(DFUDF4(f), LongType) + sp_udf(new DFUDF4(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -75,7 +75,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function5[Column, Column, Column, Column, Column, - Column]): UserDefinedFunction = sp_udf(DFUDF5(f), LongType) + Column]): UserDefinedFunction = sp_udf(new DFUDF5(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -83,7 +83,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function6[Column, Column, Column, Column, Column, Column, - Column]): UserDefinedFunction = sp_udf(DFUDF6(f), LongType) + Column]): UserDefinedFunction = sp_udf(new DFUDF6(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -91,7 +91,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function7[Column, Column, Column, Column, Column, Column, - Column, Column]): UserDefinedFunction = sp_udf(DFUDF7(f), LongType) + Column, Column]): UserDefinedFunction = sp_udf(new DFUDF7(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -99,7 +99,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function8[Column, Column, Column, Column, Column, Column, - Column, Column, Column]): UserDefinedFunction = sp_udf(DFUDF8(f), LongType) + Column, Column, Column]): UserDefinedFunction = sp_udf(new DFUDF8(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -107,7 +107,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function9[Column, Column, Column, Column, Column, Column, - Column, Column, Column, Column]): UserDefinedFunction = sp_udf(DFUDF9(f), LongType) + Column, Column, Column, Column]): UserDefinedFunction = sp_udf(new DFUDF9(f), LongType) /** * Defines a Scala closure of Columns as user-defined function (UDF). @@ -115,7 +115,7 @@ class FunctionsImpl extends Functions { * nondeterministic, call the API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: Function10[Column, Column, Column, Column, Column, Column, - Column, Column, Column, Column, Column]): UserDefinedFunction = sp_udf(DFUDF10(f), LongType) + Column, Column, Column, Column, Column]): UserDefinedFunction = sp_udf(new DFUDF10(f), LongType) ////////////////////////////////////////////////////////////////////////////////////////////// @@ -128,7 +128,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF0[Column]): UserDefinedFunction = - sp_udf(JDFUDF0(f), LongType) + sp_udf(new JDFUDF0(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -136,7 +136,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF1[Column, Column]): UserDefinedFunction = - sp_udf(JDFUDF1(f), LongType) + sp_udf(new JDFUDF1(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -144,7 +144,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF2[Column, Column, Column]): UserDefinedFunction = - sp_udf(JDFUDF2(f), LongType) + sp_udf(new JDFUDF2(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -152,7 +152,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF3[Column, Column, Column, Column]): UserDefinedFunction = - sp_udf(JDFUDF3(f), LongType) + sp_udf(new JDFUDF3(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -160,7 +160,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF4[Column, Column, Column, Column, Column]): UserDefinedFunction = - sp_udf(JDFUDF4(f), LongType) + sp_udf(new JDFUDF4(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -168,7 +168,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF5[Column, Column, Column, Column, Column, - Column]): UserDefinedFunction = sp_udf(JDFUDF5(f), LongType) + Column]): UserDefinedFunction = sp_udf(new JDFUDF5(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -176,7 +176,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF6[Column, Column, Column, Column, Column, Column, - Column]): UserDefinedFunction = sp_udf(JDFUDF6(f), LongType) + Column]): UserDefinedFunction = sp_udf(new JDFUDF6(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -184,7 +184,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF7[Column, Column, Column, Column, Column, Column, - Column, Column]): UserDefinedFunction = sp_udf(JDFUDF7(f), LongType) + Column, Column]): UserDefinedFunction = sp_udf(new JDFUDF7(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -192,7 +192,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF8[Column, Column, Column, Column, Column, Column, - Column, Column, Column]): UserDefinedFunction = sp_udf(JDFUDF8(f), LongType) + Column, Column, Column]): UserDefinedFunction = sp_udf(new JDFUDF8(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -200,7 +200,7 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF9[Column, Column, Column, Column, Column, Column, - Column, Column, Column, Column]): UserDefinedFunction = sp_udf(JDFUDF9(f), LongType) + Column, Column, Column, Column]): UserDefinedFunction = sp_udf(new JDFUDF9(f), LongType) /** * Defines a Java UDF instance of Columns as user-defined function (UDF). @@ -208,5 +208,6 @@ class FunctionsImpl extends Functions { * API `UserDefinedFunction.asNondeterministic()`. */ override def df_udf(f: UDF10[Column, Column, Column, Column, Column, Column, - Column, Column, Column, Column, Column]): UserDefinedFunction = sp_udf(JDFUDF10(f), LongType) + Column, Column, Column, Column, Column]): UserDefinedFunction = + sp_udf(new JDFUDF10(f), LongType) } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala index 35ff58d81d9..5eb4d3bfcd5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala @@ -23,7 +23,6 @@ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfGpu, import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.rapids.execution.GatherMapsResult @@ -148,7 +147,7 @@ abstract class AbstractGpuJoinIterator( // less from the gatherer, but because the gatherer tracks how much is used, the // next call to this function will start in the right place. val estimatedDataSize = (gather.numRowsLeft * gather.realCheapPerRowSizeEstimate).toLong - val targetSizeWrapper = AutoCloseableTargetSize(targetSize, minTargetSize, + val targetSizeWrapper = new AutoCloseableTargetSize(targetSize, minTargetSize, estimatedDataSize) gather.checkpoint() withRetry(targetSizeWrapper, splitTargetSizeInHalfGpu) { attempt => @@ -199,7 +198,7 @@ abstract class SplittableJoinIterator( targetSize, sizeEstimateThreshold, opTime = opTime, - joinTime = joinTime) with Logging { + joinTime = joinTime) with RapidsLocalLog { // For some join types even if there is no stream data we might output something private var isInitialJoin = true // If the join explodes this holds batches from the stream side split into smaller pieces. @@ -364,7 +363,7 @@ abstract class SplittableJoinIterator( case None if joinType == RightOuter && rightData.numCols > 0 => // Distinct right outer joins only produce a single gather map since right table rows // are not rearranged by the join. - MultiJoinGather(leftGatherer, new JoinGathererSameTable(rightData)) + new MultiJoinGather(leftGatherer, new JoinGathererSameTable(rightData)) case None => // When there isn't a `rightMap` we are in either LeftSemi or LeftAnti joins. // In these cases, the map and the table are both the left side, and everything in the map @@ -383,7 +382,7 @@ abstract class SplittableJoinIterator( } val lazyRightMap = LazySpillableGatherMap(right, "right_map") val rightGatherer = JoinGatherer(lazyRightMap, rightData, rightOutOfBoundsPolicy) - MultiJoinGather(leftGatherer, rightGatherer) + new MultiJoinGather(leftGatherer, rightGatherer) } if (gatherer.isDone) { // Nothing matched... diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala index d0f61d884d7..8d52586bcd7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,28 +28,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch -/** - * Wrapper class that specifies how many rows to replicate - * the partition value. - */ -case class PartitionRowData(rowValue: InternalRow, rowNum: Int) - -object PartitionRowData { - def from(rowValues: Array[InternalRow], rowNums: Array[Int]): Array[PartitionRowData] = { - rowValues.zip(rowNums).map { - case (rowValue, rowNum) => PartitionRowData(rowValue, rowNum) - } - } - - def from(rowValues: Array[InternalRow], rowNums: Array[Long]): Array[PartitionRowData] = { - rowValues.zip(rowNums).map { - case (rowValue, rowNum) => - require(rowNum <= Integer.MAX_VALUE, s"Row number $rowNum exceeds max value of an integer.") - PartitionRowData(rowValue, rowNum.toInt) - } - } -} - /** * Class to wrap columnar batch and partition rows data and utility functions to merge them. * @@ -59,10 +37,10 @@ object PartitionRowData { * rows to replicate the partition value. * @param partitionSchema Schema of the partitioned data. */ -case class BatchWithPartitionData( - inputBatch: SpillableColumnarBatch, - partitionedRowsData: Array[PartitionRowData], - partitionSchema: StructType) extends AutoCloseable { +class BatchWithPartitionData( + val inputBatch: SpillableColumnarBatch, + val partitionedRowsData: Array[PartitionRowData], + val partitionSchema: StructType) extends AutoCloseable { /** * Merges the partitioned data with the input ColumnarBatch. @@ -98,7 +76,9 @@ case class BatchWithPartitionData( val dataType = field.dataType // Create an array to hold the individual columns for each partition. val singlePartCols = partitionedRowsData.safeMap { - case PartitionRowData(valueRow, rowNum) => + partitionRowData => + val valueRow = partitionRowData.rowValue + val rowNum = partitionRowData.rowNum val singleValue = valueRow.get(colIndex, dataType) withResource(GpuScalar.from(singleValue, dataType)) { singleScalar => // Create a column vector from the GPU scalar, associated with the row number. @@ -272,14 +252,14 @@ object BatchWithPartitionDataUtils { // Splitting occurs if for any column, maximum rows we can fit is less than rows in partition. splitOccurred = maxRows < rowsInPartition if (splitOccurred) { - currentBatch.append(PartitionRowData(valuesInPartition, maxRows)) + currentBatch.append(new PartitionRowData(valuesInPartition, maxRows)) resultBatches.append(currentBatch.toArray) currentBatch.clear() java.util.Arrays.fill(sizeOfBatch, 0) rowsInPartition -= maxRows } else { // If there was no split, all rows can fit in current batch. - currentBatch.append(PartitionRowData(valuesInPartition, rowsInPartition)) + currentBatch.append(new PartitionRowData(valuesInPartition, rowsInPartition)) val partitionSizes = calculatePartitionSizes(rowsInPartition, valuesInPartition, partSchema) sizeOfBatch.indices.foreach(i => sizeOfBatch(i) += partitionSizes(i)) } @@ -364,7 +344,7 @@ object BatchWithPartitionDataUtils { // Combine the split GPU ColumnVectors with partition ColumnVectors. splitColumnarBatches.zip(listOfPartitionedRowsData).map { case (spillableBatch, partitionedRowsData) => - BatchWithPartitionData(spillableBatch, partitionedRowsData, partitionSchema) + new BatchWithPartitionData(spillableBatch, partitionedRowsData, partitionSchema) } } } @@ -397,9 +377,7 @@ object BatchWithPartitionDataUtils { listOfPartitionedRowsData: Array[Array[PartitionRowData]]): Seq[Int] = { // Calculate the row counts for each batch val rowCountsForEachBatch = listOfPartitionedRowsData.map(partitionData => - partitionData.map { - case PartitionRowData(_, rowNum) => rowNum - }.sum + partitionData.map(_.rowNum).sum ) // Calculate split indices using cumulative sum rowCountsForEachBatch.scanLeft(0)(_ + _).drop(1).dropRight(1) @@ -479,13 +457,13 @@ object BatchWithPartitionDataUtils { if (remainingRows > 0) { // Add rows to the left partition, up to the remaining rows available val rowsToAddToLeft = Math.min(partitionRow.rowNum, remainingRows) - leftHalf += partitionRow.copy(rowNum = rowsToAddToLeft) + leftHalf += new PartitionRowData(partitionRow.rowValue, rowsToAddToLeft) rowsAddedToLeft += rowsToAddToLeft remainingRows -= rowsToAddToLeft if (remainingRows <= 0) { // Add remaining rows to the right partition val rowsToAddToRight = partitionRow.rowNum - rowsToAddToLeft - rightHalf += partitionRow.copy(rowNum = rowsToAddToRight) + rightHalf += new PartitionRowData(partitionRow.rowValue, rowsToAddToRight) rowsAddedToRight += rowsToAddToRight } } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BucketJoinTwoSidesPrefetch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BucketJoinTwoSidesPrefetch.scala index a7acd9a1172..ae14942d66f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BucketJoinTwoSidesPrefetch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BucketJoinTwoSidesPrefetch.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ package com.nvidia.spark.rapids -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.rapids.GpuFileSourceScanExec @@ -28,7 +27,7 @@ import org.apache.spark.sql.rapids.GpuFileSourceScanExec * * NOTE: This is postShimPlanRule which should be applied after GpuOverrides. */ -object BucketJoinTwoSidesPrefetch extends Rule[SparkPlan] { +object BucketJoinTwoSidesPrefetch { // Traverse through the plan tree and enable IO prefetch for all GpuFileSourceScanExec // which are directly connected to this join node without any shuffle. @@ -44,7 +43,7 @@ object BucketJoinTwoSidesPrefetch extends Rule[SparkPlan] { } } - override def apply(plan: SparkPlan): SparkPlan = { + def apply(plan: SparkPlan): SparkPlan = { // Enable IO prefetch by a mutable operation on target nodes instead of re-generating // the plan tree. By doing so, it saves a lot of trouble. if (RapidsConf.BUCKET_JOIN_IO_PREFETCH.get(plan.conf)) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index feacd649a2e..8c72b7f9bea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -81,7 +80,23 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, debugDumpPath: Option[String], holdGpuBetweenBatches: Boolean = false, useAsyncWrite: Boolean = false, - rapidsFileIO: RapidsFileIO) extends HostBufferConsumer with Logging { + rapidsFileIO: RapidsFileIO) extends HostBufferConsumer { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + + private def logError(msg: => String, throwable: Throwable): Unit = { + log.error(msg, throwable) + } + // Length of the file written so far. This is used to track the size of the file private var fileLength: Long = 0L diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala index cb79e329e7a..a9e248c72d9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala @@ -20,7 +20,6 @@ import scala.collection.mutable.ListBuffer import com.nvidia.spark.rapids.shims.{GlobalLimitShims, QueryStageRowCountShims, SparkShimImpl} -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField, WindowFrame, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftAnti, LeftSemi} import org.apache.spark.sql.execution.{GlobalLimitExec, LocalLimitExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec} @@ -51,7 +50,13 @@ trait Optimizer { * data to the GPU just for a trivial projection and then have to move data back to the CPU on the * next step. */ -class CostBasedOptimizer extends Optimizer with Logging { +class CostBasedOptimizer extends Optimizer { + + @transient private lazy val log = org.slf4j.LoggerFactory.getLogger( + classOf[CostBasedOptimizer]) + + private def logTrace(msg: => String): Unit = if (log.isTraceEnabled) log.trace(msg) + /** * Walk the plan and determine CPU and GPU costs for each operator and then make decisions @@ -141,7 +146,7 @@ class CostBasedOptimizer extends Optimizer with Logging { // transition and reset the GPU cost if (operatorGpuCost + transitionCost > operatorCpuCost && !isExchangeOp(plan)) { // avoid transition and keep this operator on CPU - optimizations.append(AvoidTransition(plan)) + optimizations.append(new AvoidTransition(plan)) plan.costPreventsRunningOnGpu() // reset GPU cost totalGpuCost = totalCpuCost @@ -163,7 +168,7 @@ class CostBasedOptimizer extends Optimizer with Logging { if (canRunOnGpu(child) && !isExchangeOp(child) && childGpuTotal > childCpuCost) { // force this child plan back onto CPU - optimizations.append(ReplaceSection( + optimizations.append(new ReplaceSection( child, totalCpuCost, totalGpuCost)) child.recursiveCostPreventsRunningOnGpu() } @@ -193,7 +198,7 @@ class CostBasedOptimizer extends Optimizer with Logging { if (canRunOnGpu(plan) && !isExchangeOp(plan)) { // this plan would have been on GPU so we move it and onto CPU and recurse down // until we reach a part of the plan that is already on CPU and then stop - optimizations.append(ReplaceSection(plan, totalCpuCost, totalGpuCost)) + optimizations.append(new ReplaceSection(plan, totalCpuCost, totalGpuCost)) plan.recursiveCostPreventsRunningOnGpu() // reset the costs because this section of the plan was not moved to GPU totalGpuCost = totalCpuCost @@ -492,15 +497,15 @@ object RowCountPlanVisitor { sealed abstract class Optimization -case class AvoidTransition[INPUT <: SparkPlan](plan: SparkPlanMeta[INPUT]) extends Optimization { +class AvoidTransition[INPUT <: SparkPlan](val plan: SparkPlanMeta[INPUT]) extends Optimization { override def toString: String = s"It is not worth moving to GPU for operator: " + s"${Explain.format(plan)}" } -case class ReplaceSection[INPUT <: SparkPlan]( - plan: SparkPlanMeta[INPUT], - totalCpuCost: Double, - totalGpuCost: Double) extends Optimization { +class ReplaceSection[INPUT <: SparkPlan]( + val plan: SparkPlanMeta[INPUT], + val totalCpuCost: Double, + val totalGpuCost: Double) extends Optimization { override def toString: String = s"It is not worth keeping this section on GPU; " + s"gpuCost=$totalGpuCost, cpuCost=$totalCpuCost:\n${Explain.format(plan)}" } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala index f4723a7ff0f..f1a61d5d563 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala @@ -24,7 +24,6 @@ import ai.rapids.cudf.{Cuda, Rmm, RmmEventHandler} import com.nvidia.spark.rapids.spill.SpillableDeviceStore import com.sun.management.HotSpotDiagnosticMXBean -import org.apache.spark.internal.Logging /** * RMM event handler to trigger spilling from the device memory store. @@ -36,7 +35,23 @@ import org.apache.spark.internal.Logging class DeviceMemoryEventHandler( store: SpillableDeviceStore, oomDumpDir: Option[String], - maxFailedOOMRetries: Int) extends RmmEventHandler with Logging { + maxFailedOOMRetries: Int) extends RmmEventHandler { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + + private def logError(msg: => String, throwable: Throwable): Unit = { + log.error(msg, throwable) + } + // Flag that ensures we dump stack traces once and not for every allocation // failure. The assumption is that unhandled allocations will be fatal diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FoldLocalAggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FoldLocalAggregate.scala index 49600ee98f7..0a0359d7bbf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FoldLocalAggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FoldLocalAggregate.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +15,7 @@ */ package com.nvidia.spark.rapids -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate._ @@ -28,8 +26,16 @@ import org.apache.spark.sql.execution.aggregate._ * redistribute data before final aggregate. The Local Aggregate may emerge under certain * circumstance, such as the BucketScan Spec fully matches the groupBy keys. */ -object FoldLocalAggregate extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = { +object FoldLocalAggregate { + private val log = org.slf4j.LoggerFactory.getLogger(FoldLocalAggregate.getClass) + + private def logError(msg: => String): Unit = { + if (log.isErrorEnabled) { + log.error(msg) + } + } + + def apply(plan: SparkPlan): SparkPlan = { plan.transform { case p@LocalAggregatePattern(finalAgg: BaseAggregateExec, partAgg: BaseAggregateExec) => // Spark eliminates the filter for the aggExpressions in Final mode. So, we need to copy @@ -84,7 +90,16 @@ object FoldLocalAggregate extends Rule[SparkPlan] { * The LocalAggregate can be emerged regardless HashAggregateExec, SortAggregateExec or * ObjectHashAggregateExec. */ -object LocalAggregatePattern extends Logging { +object LocalAggregatePattern { + private val log = org.slf4j.LoggerFactory.getLogger(LocalAggregatePattern.getClass) + + private def logError(msg: => String): Unit = { + if (log.isErrorEnabled) { + log.error(msg) + } + } + + def unapply(plan: SparkPlan): Option[(BaseAggregateExec, BaseAggregateExec)] = { plan match { case hashAgg: HashAggregateExec diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index ae870049a45..0bb1e276aa6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -31,7 +31,6 @@ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.AggregationTagging import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, SortOrder} @@ -48,7 +47,15 @@ import org.apache.spark.sql.rapids.execution.{GpuBatchSubPartitioner, GpuShuffle import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -object AggregateUtils extends Logging { +object AggregateUtils { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + private val aggs = List("min", "max", "avg", "sum", "count", "first", "last") @@ -308,40 +315,20 @@ object AggregateUtils extends Logging { } /** Utility class to hold all of the metrics related to hash aggregation */ -case class GpuHashAggregateMetrics( - numOutputRows: GpuMetric, - numOutputBatches: GpuMetric, - numTasksRepartitioned: GpuMetric, - numTasksSkippedAgg: GpuMetric, - opTime: GpuMetric, - computeAggTime: GpuMetric, - concatTime: GpuMetric, - sortTime: GpuMetric, - repartitionTime: GpuMetric, - numAggOps: GpuMetric, - numPreSplits: GpuMetric, - singlePassTasks: GpuMetric, - heuristicTime: GpuMetric) { -} - -/** Utility class to convey information on the aggregation modes being used */ -case class AggregateModeInfo( - uniqueModes: Seq[AggregateMode], - hasPartialMode: Boolean, - hasPartialMergeMode: Boolean, - hasFinalMode: Boolean, - hasCompleteMode: Boolean) - -object AggregateModeInfo { - def apply(uniqueModes: Seq[AggregateMode]): AggregateModeInfo = { - AggregateModeInfo( - uniqueModes = uniqueModes, - hasPartialMode = uniqueModes.contains(Partial), - hasPartialMergeMode = uniqueModes.contains(PartialMerge), - hasFinalMode = uniqueModes.contains(Final), - hasCompleteMode = uniqueModes.contains(Complete) - ) - } +class GpuHashAggregateMetrics( + val numOutputRows: GpuMetric, + val numOutputBatches: GpuMetric, + val numTasksRepartitioned: GpuMetric, + val numTasksSkippedAgg: GpuMetric, + val opTime: GpuMetric, + val computeAggTime: GpuMetric, + val concatTime: GpuMetric, + val sortTime: GpuMetric, + val repartitionTime: GpuMetric, + val numAggOps: GpuMetric, + val numPreSplits: GpuMetric, + val singlePassTasks: GpuMetric, + val heuristicTime: GpuMetric) extends Serializable { } /** @@ -619,7 +606,8 @@ class AggHelper( } } -object GpuAggregateIterator extends Logging { +object GpuAggregateIterator { + /** * @note abstracted away for a unit test.. * @param helper @@ -753,9 +741,9 @@ object GpuAggFirstPassIterator { // * boundFinalProjections: on merged batches, finalize aggregates // (GpuAverage => CudfSum/CudfCount) // * boundResultReferences: project the result expressions Spark expects in the output. -case class BoundExpressionsModeAggregates( - boundFinalProjections: Option[Seq[GpuExpression]], - boundResultReferences: Seq[Expression]) +class BoundExpressionsModeAggregates( + val boundFinalProjections: Option[Seq[GpuExpression]], + val boundResultReferences: Seq[Expression]) object GpuAggFinalPassIterator { @@ -805,7 +793,7 @@ object GpuAggFinalPassIterator { } else { GpuBindReferences.bindGpuReferences(resultExpressions, groupingAttributes, metrics) } - BoundExpressionsModeAggregates( + new BoundExpressionsModeAggregates( boundFinalProjections, boundResultReferences) } @@ -909,7 +897,7 @@ class GpuMergeAggregateIterator( localInputRowsCount: LocalGpuMetric, allMetrics: Map[String, GpuMetric] ) - extends Iterator[ColumnarBatch] with AutoCloseable with Logging { + extends Iterator[ColumnarBatch] with AutoCloseable with RapidsLocalLog { private[this] val isReductionOnly = groupingExpressions.isEmpty private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) @@ -978,7 +966,7 @@ class GpuMergeAggregateIterator( s"$firstPassReductionRatioEstimate") // if so, skip any aggregation, return the origin batch directly - realIter = Some(ConcatIterator(firstPassIter, configuredTargetBatchSize)) + realIter = Some(new ConcatIterator(firstPassIter, configuredTargetBatchSize)) metrics.numTasksSkippedAgg += 1 return realIter.get.next() } else { @@ -1011,7 +999,7 @@ class GpuMergeAggregateIterator( metrics.numTasksRepartitioned += 1 } - realIter = Some(ConcatIterator( + realIter = Some(new ConcatIterator( new CloseableBufferedIterator(buildBucketIterator()), configuredTargetBatchSize)) realIter.get.next() } @@ -1032,7 +1020,7 @@ class GpuMergeAggregateIterator( new AggHelper(inputAttributes, groupingExpressions, aggregateExpressions, forceMerge = true, conf, isSorted = false, allMetrics) - private case class ConcatIterator( + private class ConcatIterator( input: CloseableBufferedIterator[SpillableColumnarBatch], targetSize: Long) extends Iterator[ColumnarBatch] { @@ -1061,7 +1049,7 @@ class GpuMergeAggregateIterator( } } - private case class RepartitionAggregateIterator(opTime: GpuMetric) + private class RepartitionAggregateIterator(opTime: GpuMetric) extends Iterator[SpillableColumnarBatch] { batchesByBucket = batchesByBucket.filter(_.size() > 0) @@ -1098,7 +1086,7 @@ class GpuMergeAggregateIterator( /** Build an iterator merging aggregated batches in each bucket. */ private def buildBucketIterator(): Iterator[SpillableColumnarBatch] = { - bucketIter = Some(RepartitionAggregateIterator(opTime = metrics.opTime)) + bucketIter = Some(new RepartitionAggregateIterator(metrics.opTime)) bucketIter.get } @@ -1952,7 +1940,7 @@ case class GpuHashAggregateExec( allowSinglePassAgg: Boolean, allowNonFullyAggregatedOutput: Boolean, skipAggPassReductionRatio: Double -) extends GpuPartitioningPreservingUnaryExecNode with GpuExec with Logging { +) extends GpuPartitioningPreservingUnaryExecNode with GpuExec { // lifted directly from `BaseAggregateExec.inputAttributes`, edited comment. def inputAttributes: Seq[Attribute] = @@ -1998,7 +1986,7 @@ case class GpuHashAggregateExec( } override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { - val aggMetrics = GpuHashAggregateMetrics( + val aggMetrics = new GpuHashAggregateMetrics( numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS), numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES), numTasksRepartitioned = gpuLongMetric(NUM_TASKS_REPARTITIONED), @@ -2019,7 +2007,7 @@ case class GpuHashAggregateExec( val aggregateExprs = aggregateExpressions val aggregateAttrs = aggregateAttributes val resultExprs = resultExpressions - val modeInfo = AggregateModeInfo(uniqueModes) + val modeInfo = AggregateModeInfo.from(uniqueModes) val targetBatchSize = configuredTargetBatchSize val rdd = child.executeColumnar() @@ -2213,7 +2201,7 @@ class DynamicGpuPartialAggregateIterator( inputIter } else { val sorter = new GpuSorter(ordering, inputAttrs, allMetrics) - GpuOutOfCoreSortIterator(inputIter, + new GpuOutOfCoreSortIterator(inputIter, sorter, configuredTargetBatchSize, opTime = metrics.opTime, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala index 3c5b9f26872..6f96496e8fd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.ast import com.nvidia.spark.rapids.shims.ShimExpression -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions @@ -40,7 +39,8 @@ trait GpuBind { def bind(input: AttributeSeq): GpuExpression } -object GpuBindReferences extends Logging { +object GpuBindReferences { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) /** * An alternative to `Expression.transformDown`, but when a result is returned by `rule` it is @@ -161,8 +161,9 @@ object GpuBindReferences extends Logging { case (es: Seq[Expression], is: AttributeSeq) => es.map(GpuBindReferences.bindGpuReferenceNoMetrics(_, is)).toList } - logTrace { - "INPUT:\n" + + if (log.isTraceEnabled) { + log.trace( + "INPUT:\n" + expressions.zipWithIndex.map { case (expr, idx) => s"\t$idx:\t$expr" @@ -175,11 +176,11 @@ object GpuBindReferences extends Logging { case (expr, idx) => s"\t\t$idx:\t$expr" }.mkString("\n") - }.mkString("\n") + }.mkString("\n")) } - GpuTieredProject(tiered) + new GpuTieredProject(tiered) } else { - GpuTieredProject(Seq(GpuBindReferences.bindGpuReferencesNoMetrics(expressions, input))) + new GpuTieredProject(Seq(GpuBindReferences.bindGpuReferencesNoMetrics(expressions, input))) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 1328c3b7a1f..4847738b3da 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -204,6 +204,9 @@ object CastOptions { override val useHexFormatForBinary: Boolean = true } + + private def defaultNullifyOverflows: Boolean = + CastTimeToIntShim.ifNullifyOverflows } /** @@ -223,7 +226,7 @@ class CastOptions( legacyCastComplexTypesToString: Boolean, ansiMode: Boolean, stringToDateAnsiMode: Boolean, - val nullifyOverflows: Boolean = CastTimeToIntShim.ifNullifyOverflows, + val nullifyOverflows: Boolean = CastOptions.defaultNullifyOverflows, val castToJsonString: Boolean = false, val ignoreNullFieldsInStructs: Boolean = true, val timeZoneId: Option[String] = Option.empty[String]) extends Serializable { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index ffa84c5d5d7..bfced5eb03e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -27,7 +27,6 @@ import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode} import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} @@ -118,7 +117,7 @@ object CoalesceGoal { a // They are equal so it does not matter } else { // Nothing is the same so there is no guarantee - BatchedByKey(Seq.empty)(Seq.empty) + new BatchedByKey(Seq.empty)(Seq.empty) } case (TargetSize(aSize), TargetSize(bSize)) if aSize > bSize => a case _ => b @@ -144,9 +143,9 @@ object CoalesceGoal { case (_, _: RequireSingleBatchLike) => false case (_: BatchedByKey, _: TargetSize) => true case (_: TargetSize, _: BatchedByKey) => false - case (BatchedByKey(aOrder), BatchedByKey(bOrder)) => - aOrder.length == bOrder.length && - aOrder.zip(bOrder).forall { + case (aGoal: BatchedByKey, bGoal: BatchedByKey) => + aGoal.gpuOrder.length == bGoal.gpuOrder.length && + aGoal.gpuOrder.zip(bGoal.gpuOrder).forall { case (a, b) => a.satisfies(b) } case (TargetSize(foundSize), TargetSize(requiredSize)) => foundSize >= requiredSize @@ -236,10 +235,28 @@ case class TargetSize(override val targetSizeBytes: Long) * @param gpuOrder the GPU keys that should be used for batching. * @param cpuOrder the CPU keys that should be used for batching. */ -case class BatchedByKey(gpuOrder: Seq[SortOrder])(val cpuOrder: Seq[SortOrder]) - extends CoalesceGoal { +class BatchedByKey(val gpuOrder: Seq[SortOrder])(val cpuOrder: Seq[SortOrder]) + extends CoalesceGoal with Serializable { require(gpuOrder.size == cpuOrder.size) + override def canEqual(that: Any): Boolean = that.isInstanceOf[BatchedByKey] + + override def productArity: Int = 1 + + override def productElement(n: Int): Any = n match { + case 0 => gpuOrder + case _ => throw new IndexOutOfBoundsException(n.toString) + } + + override def productPrefix: String = "BatchedByKey" + + override def equals(other: Any): Boolean = other match { + case that: BatchedByKey => that.canEqual(this) && gpuOrder == that.gpuOrder + case _ => false + } + + override def hashCode(): Int = scala.runtime.ScalaRunTime._hashCode(this) + override def otherCopyArgs: Seq[AnyRef] = cpuOrder :: Nil override def children: Seq[Expression] = gpuOrder @@ -267,7 +284,20 @@ abstract class AbstractGpuCoalesceIterator( streamTimeOrNoop: GpuMetric, concatTime: GpuMetric, opTime: GpuMetric, - opName: String) extends Iterator[ColumnarBatch] with Logging { + opName: String) extends Iterator[ColumnarBatch] { + + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + protected def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + protected def logWarning(msg: => String): Unit = { + log.warn(msg) + } + val streamTime = streamTimeOrNoop match { case NoopMetric => new LocalGpuMetric @@ -507,7 +537,7 @@ abstract class AbstractGpuCoalesceIterator( case RequireSingleBatchWithFilter(filterExpression) => if (inputFilterTier.isEmpty) { // We are going to enter the null-filtering mode - val filterTier = GpuTieredProject(Seq(Seq(filterExpression))) + val filterTier = new GpuTieredProject(Seq(Seq(filterExpression))) // 1) Filter what we had already stored, and the rows number should // be within the limit. // Re-calculate the filtered rows number and size. @@ -693,7 +723,7 @@ abstract class AbstractGpuCoalesceIterator( throw new GpuSplitAndRetryOOM(s"Cannot split a sequence of $numBatches batches") } val res = it.splitAt(numBatches / 2) - Seq(BatchesToCoalesce(res._1), BatchesToCoalesce(res._2)) + Seq(new BatchesToCoalesce(res._1), new BatchesToCoalesce(res._2)) } } } @@ -706,7 +736,7 @@ abstract class AbstractGpuCoalesceIterator( * instances in `batches` * @param batches a sequence of `SpillableColumnarBatch` to manage. */ -case class BatchesToCoalesce(batches: Array[SpillableColumnarBatch]) +class BatchesToCoalesce(val batches: Array[SpillableColumnarBatch]) extends AutoCloseable { override def close(): Unit = { batches.safeClose() @@ -766,7 +796,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], } override def getCoalesceRetryIterator: Iterator[ColumnarBatch] = { - val candidates = BatchesToCoalesce(batches.clone().toArray) + val candidates = new BatchesToCoalesce(batches.clone().toArray) batches.clear() withRetry(candidates, splitBatchesToCoalesceFn) { attempt: BatchesToCoalesce => concatBatches(attempt.batches) @@ -888,7 +918,7 @@ class GpuCompressionAwareCoalesceIterator( } override def getCoalesceRetryIterator: Iterator[ColumnarBatch] = { - val candidates = BatchesToCoalesce(batches.clone().toArray) + val candidates = new BatchesToCoalesce(batches.clone().toArray) batches.clear() withRetry(candidates, splitBatchesToCoalesceFn) { attempt: BatchesToCoalesce => concatBatches(attempt.batches) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala index 76bfcea0970..7d4eab33694 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimExpression -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression} import org.apache.spark.sql.rapids.BridgeUnsafeProjection @@ -47,8 +46,16 @@ case class GpuCpuBridgeExpression( gpuInputs: Seq[Expression], cpuExpression: Expression, outputDataType: DataType, - outputNullable: Boolean) extends GpuExpression with ShimExpression - with Logging with GpuBind with GpuMetricsInjectable { + outputNullable: Boolean) extends GpuExpression with ShimExpression + with GpuBind with GpuMetricsInjectable { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + override def children: Seq[Expression] = gpuInputs ++ Seq(cpuExpression) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala index 0f4af3f0ca2..a6d49b8bd14 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala @@ -21,43 +21,32 @@ import java.net.URI import com.nvidia.spark.rapids.RapidsConf.LORE_SKIP_DUMPING_PLAN import com.nvidia.spark.rapids.lore.{GpuLore, GpuLoreDumpExec} import com.nvidia.spark.rapids.lore.GpuLore.{loreIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG} -import com.nvidia.spark.rapids.shims.{ShimUnaryCommand, ShimUnaryExecNode} +import com.nvidia.spark.rapids.shims.{ShimDataWritingCommand, ShimUnaryExecNode} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker import org.apache.spark.sql.rapids.shims.RapidsErrorUtils -import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration /** * An extension of `DataWritingCommand` that allows columnar execution. */ -trait GpuDataWritingCommand extends DataWritingCommand with ShimUnaryCommand { +trait GpuDataWritingCommand extends ShimDataWritingCommand { lazy val basicMetrics: Map[String, GpuMetric] = GpuWriteJobStatsTracker.basicMetrics lazy val taskMetrics: Map[String, GpuMetric] = GpuWriteJobStatsTracker.taskMetrics override lazy val metrics: Map[String, SQLMetric] = GpuMetric.unwrap(basicMetrics ++ taskMetrics) - def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { - Arm.withResource(runColumnar(sparkSession, child)) { batches => - assert(batches.isEmpty) - } - Seq.empty[Row] - } - - def runColumnar(sparkSession: SparkSession, child: SparkPlan): Seq[ColumnarBatch] - def gpuWriteJobStatsTracker( hadoopConf: Configuration): GpuWriteJobStatsTracker = { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) @@ -122,7 +111,7 @@ case class GpuDataWritingCommandExec(cmd: GpuDataWritingCommand, child: SparkPla dumpLoreMetaInfo() // Execute the command with LoRE dumping if needed val childWithDumping = dumpLoreRDD(child) - cmd.runColumnar(sparkSession, childWithDumping) + cmd.runColumnarFromAny(sparkSession, childWithDumping) } override def output: Seq[Attribute] = cmd.output diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 921e0ab8e86..23cf056c813 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -26,6 +26,7 @@ import org.apache.spark.{Partition, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -35,8 +36,6 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec} -import org.apache.spark.sql.rapids.shims.SparkSessionUtils -import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -133,6 +132,12 @@ trait RapidsLocalLog { } object GpuExec { + @transient private[this] lazy val sparkPlanSessionMethod = + classOf[SparkPlan].getMethod("session") + + def sessionFromPlan(plan: SparkPlan): SparkSession = + sparkPlanSessionMethod.invoke(plan).asInstanceOf[SparkSession] + def outputBatching(sp: SparkPlan): CoalesceGoal = sp match { case gpu: GpuExec => gpu.outputBatching case _ => null @@ -148,7 +153,7 @@ trait GpuExec extends SparkPlan with Logging { RapidsConf.OP_TIME_TRACKING_RDD_ENABLED.get(conf) def sparkSession: SparkSession = { - SparkSessionUtils.sessionFromPlan(this) + GpuExec.sessionFromPlan(this) } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala index 226ed0d381b..d536cb1980b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala @@ -20,7 +20,6 @@ import ai.rapids.cudf.{DType, PartitionedTable} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.shims.ShimExpression -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Expression, HiveHash, Murmur3Hash} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.rapids.{GpuHashExpression, GpuHiveHash, GpuMurmur3Hash, GpuPmod} @@ -95,7 +94,16 @@ abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitio def partitionIdExpression: GpuExpression = GpuPmod(hashFunc, GpuLiteral(numPartitions)) } -object GpuHashPartitioningBase extends Logging { +object GpuHashPartitioningBase { + + private val log = org.slf4j.LoggerFactory.getLogger(GpuHashPartitioningBase.getClass) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + val DEFAULT_HASH_SEED: Int = 42 @@ -117,7 +125,7 @@ object GpuHashPartitioningBase extends Logging { hashMode = hashModeMethod.invoke(cpuHp) match { case m if m == classOf[Murmur3Hash] => Murmur3Mode case h if h == classOf[HiveHash] => HiveMode - case o => UnsupportedMode(o.asInstanceOf[Class[_]].getSimpleName) + case o => new UnsupportedMode(o.asInstanceOf[Class[_]].getSimpleName) } logDebug(s"Found hash function '$hashMode' from CPU hash partitioning.") } catch { @@ -134,6 +142,6 @@ sealed trait HashMode extends Serializable case object Murmur3Mode extends HashMode case object HiveMode extends HashMode -case class UnsupportedMode(modeName: String) extends HashMode { +class UnsupportedMode(val modeName: String) extends HashMode { override def toString: String = modeName } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala index c7406227f79..5da50517dea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.types.{DataType, IntegerType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -173,7 +173,7 @@ case class GpuRangePartitioner( sorter: GpuSorter) extends GpuExpression with ShimExpression with GpuPartitioning { private lazy val converters = new GpuRowToColumnConverter( - TrampolineUtil.fromAttributes(sorter.projectedBatchSchema)) + DataTypeUtilsShim.fromAttributes(sorter.projectedBatchSchema)) override def nullable: Boolean = false override def dataType: DataType = IntegerType @@ -189,7 +189,7 @@ case class GpuRangePartitioner( // Don't make this retry-block avoiding nested try-blocks // from computeBoundsAndCloseWithRetry withResource(converters.convertBatch(rangeBounds, - TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges => + DataTypeUtilsShim.fromAttributes(sorter.projectedBatchSchema))) { ranges => withResource(sorter.appendProjectedColumns(cb)) { withExtraColumns => sorter.lowerBound(ranges, withExtraColumns) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 8e61fa1c9b9..405571ba1ab 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -23,7 +23,6 @@ import com.nvidia.spark.rapids.shims.{CudfUnsafeRow, GpuTypeShims, ShimUnaryExec import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow} @@ -107,22 +106,22 @@ object GpuRowToColumnConverter { // NOT SUPPORTED YET // case CalendarIntervalType => CalendarConverter case (at: ArrayType, true) => - ArrayConverter(getConverterForType(at.elementType, at.containsNull)) + new ArrayConverter(getConverterForType(at.elementType, at.containsNull)) case (at: ArrayType, false) => - NotNullArrayConverter(getConverterForType(at.elementType, at.containsNull)) + new NotNullArrayConverter(getConverterForType(at.elementType, at.containsNull)) case (st: StructType, true) => - StructConverter(st.fields.map(getConverterFor)) + new StructConverter(st.fields.map(getConverterFor)) case (st: StructType, false) => - NotNullStructConverter(st.fields.map(getConverterFor)) + new NotNullStructConverter(st.fields.map(getConverterFor)) case (dt: DecimalType, true) => new DecimalConverter(dt.precision, dt.scale) case (dt: DecimalType, false) => new NotNullDecimalConverter(dt.precision, dt.scale) case (MapType(k, v, vcn), true) => - MapConverter(getConverterForType(k, nullable = false), + new MapConverter(getConverterForType(k, nullable = false), getConverterForType(v, vcn)) case (MapType(k, v, vcn), false) => - NotNullMapConverter(getConverterForType(k, nullable = false), + new NotNullMapConverter(getConverterForType(k, nullable = false), getConverterForType(v, vcn)) case (NullType, _) => // nullable=false appears only as a synthetic child of empty nested @@ -404,7 +403,7 @@ object GpuRowToColumnConverter { ret + OFFSET } - private case class MapConverter( + private class MapConverter( keyConverter: TypeConverter, valueConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, @@ -420,7 +419,7 @@ object GpuRowToColumnConverter { override def getNullSize: Double = OFFSET + VALIDITY } - private case class NotNullMapConverter( + private class NotNullMapConverter( keyConverter: TypeConverter, valueConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, @@ -462,7 +461,7 @@ object GpuRowToColumnConverter { ret + OFFSET } - private case class ArrayConverter(childConverter: TypeConverter) + private class ArrayConverter(childConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, column: Int, builder: RapidsHostColumnBuilder): Double = { @@ -477,7 +476,7 @@ object GpuRowToColumnConverter { override def getNullSize: Double = OFFSET + VALIDITY } - private case class NotNullArrayConverter(childConverter: TypeConverter) + private class NotNullArrayConverter(childConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, column: Int, builder: RapidsHostColumnBuilder): Double = { @@ -501,7 +500,7 @@ object GpuRowToColumnConverter { ret } - private case class StructConverter( + private class StructConverter( childConverters: Array[TypeConverter]) extends TypeConverter { override def append(row: SpecializedGetters, column: Int, @@ -518,7 +517,7 @@ object GpuRowToColumnConverter { override def getNullSize: Double = childConverters.map(_.getNullSize).sum + VALIDITY } - private case class NotNullStructConverter( + private class NotNullStructConverter( childConverters: Array[TypeConverter]) extends TypeConverter { override def append(row: SpecializedGetters, column: Int, @@ -756,7 +755,17 @@ class RowToColumnarIterator( } -object GeneratedInternalRowToCudfRowIterator extends Logging { +object GeneratedInternalRowToCudfRowIterator { + private val log = org.slf4j.LoggerFactory.getLogger( + GeneratedInternalRowToCudfRowIterator.getClass) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + def apply(input: Iterator[InternalRow], schema: Array[Attribute], goal: CoalesceSizeGoal, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala index 644acd19fbb..2817d7c710f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala @@ -112,7 +112,7 @@ case class GpuRunnableCommandExec(cmd: GpuRunnableCommand, child: SparkPlan) override lazy val allMetrics: Map[String, GpuMetric] = GpuMetric.wrap(cmd.metrics) private lazy val sideEffectResult: Seq[ColumnarBatch] = - cmd.runColumnar(sparkSession, child) + cmd.runColumnar(sparkSession.asInstanceOf[GpuSparkSession], child) override def output: Seq[Attribute] = cmd.output diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index a51aa1c1de7..fd15ed1e858 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import com.nvidia.spark.rapids.jni.{RmmSpark, TaskPriority} import com.nvidia.spark.rapids.metrics.GpuBubbleTimerManager import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuTaskMetrics @@ -49,18 +48,13 @@ case object SemaphoreAcquired extends TryAcquireResult * @param numWaitingTasks the number of tasks waiting at the time the request was made. * Note that this can change very quickly. */ -case class AcquireFailed(numWaitingTasks: Int) extends TryAcquireResult - -private object GpuTaskMemoryEstimator { - private val TIME_WINDOW: Double = TimeUnit.MILLISECONDS.toNanos(100).toDouble -} +class AcquireFailed(val numWaitingTasks: Int) extends TryAcquireResult with Serializable class GpuTaskMemoryEstimator(val stageId: Int, val taskId: Long, val defaultEstimate: Long, val allowDynamicUpdate: Boolean) { - import GpuTaskMemoryEstimator._ - + private val TIME_WINDOW: Double = TimeUnit.MILLISECONDS.toNanos(100).toDouble private val startTimeNanos: Long = System.nanoTime() private var totalTimeLost: Long = 0 private var maxMemory: Long = 0 @@ -318,7 +312,7 @@ object GpuSemaphore { */ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long, memoryEstimator: GpuStageMemoryEstimator, - bubbleTimerMgr: GpuBubbleTimerManager) extends Logging { + bubbleTimerMgr: GpuBubbleTimerManager) { /** * This holds threads that are not on the GPU yet. Most of the time they are * blocked waiting for the semaphore to let them on, but it may hold one @@ -509,7 +503,23 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long, } } -private final class GpuSemaphore(val maxConcurrentGpuTasksLimit: Int) extends Logging { +private final class GpuSemaphore(val maxConcurrentGpuTasksLimit: Int) { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + + private def logWarning(msg: => String, throwable: Throwable): Unit = { + log.warn(msg, throwable) + } + import GpuSemaphore._ @@ -572,7 +582,7 @@ private final class GpuSemaphore(val maxConcurrentGpuTasksLimit: Int) extends Lo numWaiting += 1 } } - AcquireFailed(numWaiting) + new AcquireFailed(numWaiting) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index e289ad3774b..a4c58f7fe29 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -24,7 +24,6 @@ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} @@ -282,7 +281,12 @@ case class GpuShuffledHashJoinExec( } } -object GpuShuffledHashJoinExec extends Logging { +object GpuShuffledHashJoinExec { + + private val log = org.slf4j.LoggerFactory.getLogger(GpuShuffledHashJoinExec.getClass) + + private def logDebug(msg: => String): Unit = if (log.isDebugEnabled) log.debug(msg) + /** * Return the build data as a single ColumnarBatch when sub-partitioning is not enabled, * while as an iterator of ColumnarBatch when sub-partitioning is enabled. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala index abc718d83f5..c48901b2d0a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala @@ -156,14 +156,14 @@ object GpuShuffledSizedHashJoinExec { } /** Utility class to track information related to a join. */ - case class JoinInfo( - joinType: JoinType, - buildSide: GpuBuildSide, - buildIter: Iterator[ColumnarBatch], - buildSize: Long, - buildStats: Option[JoinBuildSideStats], - streamIter: Iterator[ColumnarBatch], - exprs: BoundJoinExprs) + class JoinInfo( + val joinType: JoinType, + val buildSide: GpuBuildSide, + val buildIter: Iterator[ColumnarBatch], + val buildSize: Long, + val buildStats: Option[JoinBuildSideStats], + val streamIter: Iterator[ColumnarBatch], + val exprs: BoundJoinExprs) /** * Trait to house common code for determining the ideal build/stream @@ -750,7 +750,7 @@ object GpuShuffledSymmetricHashJoinExec { val streamIter = new CollectTimeIterator(NvtxRegistry.FETCH_JOIN_STREAM, setupForJoin(streamQueue, rawStreamIter, exprs.streamTypes, gpuBatchSizeBytes, metrics), streamTime) - JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) + new JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) } } } @@ -889,12 +889,12 @@ object GpuShuffledAsymmetricHashJoinExec { if (streamRows <= Int.MaxValue && streamSize <= gpuBatchSizeBytes) { metrics(BUILD_DATA_SIZE).set(streamSize) val flippedSide = flipped(buildSide) - JoinInfo(joinType, flippedSide, streamIter, streamSize, None, baseBuildIter, + new JoinInfo(joinType, flippedSide, streamIter, streamSize, None, baseBuildIter, exprs.flipped(joinType, flippedSide, condition, leftOutput, rightOutput, metrics)) } else { val buildIter = addNullFilterIfNecessary(baseBuildIter, exprs.boundBuildKeys, exprs.buildSideNeedsNullFilter, metrics) - JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) + new JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) } } } @@ -923,14 +923,14 @@ object GpuShuffledAsymmetricHashJoinExec { metrics(BUILD_DATA_SIZE).set(buildSize) val buildIter = addNullFilterIfNecessary(baseBuildIter, exprs.boundBuildKeys, exprs.buildSideNeedsNullFilter, metrics) - JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) + new JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs) } else { val buildBatch = getAsSingleBuildBatch(baseBuildIter, exprs, metrics) val buildIter = new SingleGpuColumnarBatchIterator(buildBatch) val buildStats = JoinBuildSideStats.fromBatch(buildBatch, exprs.boundBuildKeys) if (buildStats.streamMagnificationFactor < magnificationThreshold) { metrics(BUILD_DATA_SIZE).set(buildSize) - JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), streamIter, + new JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), streamIter, exprs) } else { // The natural build side is explosive, so check the natural stream side to see @@ -962,25 +962,26 @@ object GpuShuffledAsymmetricHashJoinExec { if (buildStats.streamMagnificationFactor < streamStats.streamMagnificationFactor) { metrics(BUILD_DATA_SIZE).set(buildSize) - JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), + new JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), singleStreamIter, exprs) } else { metrics(BUILD_DATA_SIZE).set(streamSize) val flippedSide = flipped(buildSide) - JoinInfo(joinType, flippedSide, singleStreamIter, streamSize, Some(streamStats), + new JoinInfo( + joinType, flippedSide, singleStreamIter, streamSize, Some(streamStats), buildIter, exprs.flipped(joinType, flippedSide, condition, leftOutput, rightOutput, metrics)) } } else { metrics(BUILD_DATA_SIZE).set(streamSize) val flippedSide = flipped(buildSide) - JoinInfo(joinType, flippedSide, streamBatchIter, streamSize, None, + new JoinInfo(joinType, flippedSide, streamBatchIter, streamSize, None, buildIter, exprs.flipped(joinType, flippedSide, condition, leftOutput, rightOutput, metrics)) } } else { metrics(BUILD_DATA_SIZE).set(buildSize) - JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), + new JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats), new SpillableColumnarBatchQueueIterator(streamQueue, streamIter), exprs) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index 9a79151d3cf..828fb32b72e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistri import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.{GpuWriteJobStatsTracker, GpuWriteTaskStatsTracker} -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.vectorized.ColumnarBatch sealed trait SortExecType extends Serializable @@ -141,12 +141,12 @@ case class GpuSortExec( tcs.map(_.newTaskInstance().asInstanceOf[GpuWriteTaskStatsTracker]) } val finalIter = if (outOfCore) { - val iter = GpuOutOfCoreSortIterator(cbIter, sorter, + val iter = new GpuOutOfCoreSortIterator(cbIter, sorter, targetSize, opTime, sortTime, outputBatch, outputRows) onTaskCompletion(iter.close()) iter } else { - GpuSortEachBatchIterator(cbIter, sorter, singleBatch, + new GpuSortEachBatchIterator(cbIter, sorter, singleBatch, opTime, sortTime, outputBatch, outputRows) } if (taskTrackers.exists(_.nonEmpty)) { @@ -165,14 +165,14 @@ case class GpuSortExec( } } -case class GpuSortEachBatchIterator( - iter: Iterator[ColumnarBatch], - sorter: GpuSorter, - singleBatch: Boolean, - opTime: GpuMetric = NoopMetric, - sortTime: GpuMetric = NoopMetric, - outputBatches: GpuMetric = NoopMetric, - outputRows: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] { +class GpuSortEachBatchIterator( + val iter: Iterator[ColumnarBatch], + val sorter: GpuSorter, + val singleBatch: Boolean, + val opTime: GpuMetric, + val sortTime: GpuMetric, + val outputBatches: GpuMetric, + val outputRows: GpuMetric) extends Iterator[ColumnarBatch] with Serializable { override def hasNext: Boolean = iter.hasNext override def next(): ColumnarBatch = { @@ -238,8 +238,8 @@ object GpuSpillableProjectedSortEachBatchIterator { * Holds data for the out of core sort. It includes the batch of data and the first row in that * batch so we can sort the batches. */ -case class OutOfCoreBatch(buffer: SpillableColumnarBatch, - firstRow: UnsafeRow) extends AutoCloseable { +class OutOfCoreBatch(val buffer: SpillableColumnarBatch, + val firstRow: UnsafeRow) extends AutoCloseable { override def close(): Unit = buffer.close() } @@ -295,15 +295,15 @@ class Pending(cpuOrd: LazilyGeneratedOrdering) extends AutoCloseable { * the merged data is split and put back into a pending queue. The process repeats until we have * enough data to output. */ -case class GpuOutOfCoreSortIterator( - iter: Iterator[ColumnarBatch], - sorter: GpuSorter, - targetSize: Long, - opTime: GpuMetric, - sortTime: GpuMetric, - outputBatches: GpuMetric, - outputRows: GpuMetric) extends Iterator[ColumnarBatch] - with AutoCloseable { +class GpuOutOfCoreSortIterator( + val iter: Iterator[ColumnarBatch], + val sorter: GpuSorter, + val targetSize: Long, + val opTime: GpuMetric, + val sortTime: GpuMetric, + val outputBatches: GpuMetric, + val outputRows: GpuMetric) extends Iterator[ColumnarBatch] + with AutoCloseable with Serializable { /** * This has already sorted the data, and it still has the projected columns in it that need to @@ -328,7 +328,7 @@ case class GpuOutOfCoreSortIterator( // Used for converting between rows and columns when we have to put a cuttoff on the GPU // to know how much of the data after a merge sort is fully sorted. private lazy val converters = new GpuRowToColumnConverter( - TrampolineUtil.fromAttributes(sorter.projectedBatchSchema)) + DataTypeUtilsShim.fromAttributes(sorter.projectedBatchSchema)) /** * Convert the boundaries (first rows for each batch) into unsafe rows for use later on. @@ -434,7 +434,7 @@ case class GpuOutOfCoreSortIterator( if (ct.getRowCount > 0) { val sp = SpillableColumnarBatch(ct, sorter.projectedBatchTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - pendingObs += OutOfCoreBatch(sp, lower) + pendingObs += new OutOfCoreBatch(sp, lower) } else { ct.close() } @@ -517,7 +517,7 @@ case class GpuOutOfCoreSortIterator( val cutoff = pending.peek().firstRow val result = RmmRapidsRetryIterator.withRetryNoSplit[ColumnVector] { withResource(converters.convertBatch(Array(cutoff), - TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { cutoffCb => + DataTypeUtilsShim.fromAttributes(sorter.projectedBatchSchema))) { cutoffCb => withResource(mergedSpillBatch.getColumnarBatch()) { mergedBatch => sorter.upperBound(mergedBatch, cutoffCb) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HashExprMetas.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HashExprMetas.scala index dc59a8dd3c9..98b191b32f7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HashExprMetas.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HashExprMetas.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types.{ArrayType, StructType} /** Base meta for Murmur3-hash-like expressions. */ -case class Murmur3HashExprMeta[HEINT <: HashExpression[Int]]( +class Murmur3HashExprMeta[HEINT <: HashExpression[Int]]( expr: HEINT, override val conf: RapidsConf, override val parent: Option[RapidsMeta[_, _, _]], @@ -47,7 +47,7 @@ case class Murmur3HashExprMeta[HEINT <: HashExpression[Int]]( } /** Base meta for xxhash64-like expressions. */ -case class XxHash64ExprMeta[HE <: HashExpression[Long]]( +class XxHash64ExprMeta[HE <: HashExpression[Long]]( expr: HE, override val conf: RapidsConf, override val parent: Option[RapidsMeta[_, _, _]], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala index 8086d75c253..1503e291272 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala @@ -28,12 +28,22 @@ import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark} import com.nvidia.spark.rapids.spill.SpillFramework import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.GpuTaskMetrics -case class HostAllocResult(buffer: HostMemoryBuffer, isPinned: Boolean) -private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging { +private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator { + private def logTrace(msg: => String): Unit = { + HostAlloc.logTrace(msg) + } + + private def logInfo(msg: => String): Unit = { + HostAlloc.logInfo(msg) + } + + private def logWarning(msg: => String): Unit = { + HostAlloc.logWarning(msg) + } + private var currentNonPinnedAllocated: Long = 0L private val pinnedLimit: Long = PinnedMemoryPool.getTotalPoolSizeBytes // For now we are going to assume that we are the only ones calling into the pinned pool @@ -205,14 +215,14 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L do { ret = ( if (preferPinned) { - tryAllocPinned(amount).map(HostAllocResult(_, isPinned = true)) + tryAllocPinned(amount).map(buffer => new HostAllocResult(buffer, true)) } else { - tryAllocNonPinned(amount).map(HostAllocResult(_, isPinned = false)) + tryAllocNonPinned(amount).map(buffer => new HostAllocResult(buffer, false)) }).orElse { if (preferPinned) { - tryAllocNonPinned(amount).map(HostAllocResult(_, isPinned = false)) + tryAllocNonPinned(amount).map(buffer => new HostAllocResult(buffer, false)) } else { - tryAllocPinned(amount).map(HostAllocResult(_, isPinned = true)) + tryAllocPinned(amount).map(buffer => new HostAllocResult(buffer, true)) } } if (ret.isEmpty) { @@ -226,7 +236,9 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L allocAttemptFinishedWithoutException = true } finally { ret match { - case Some(HostAllocResult(buffer: HostMemoryBuffer, isPinned: Boolean)) => + case Some(result) => + val buffer = result.buffer + val isPinned = result.isPinned val metrics = GpuTaskMetrics.get metrics.incHostBytesAllocated(amount, isPinned) if (BOOKKEEP_MEMORY) { @@ -287,7 +299,25 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L /** * A new API for host memory allocation. This can be used to limit the amount of host memory. */ -object HostAlloc extends Logging { +object HostAlloc { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) + + private def logTrace(msg: => String): Unit = { + if (log.isTraceEnabled) { + log.trace(msg) + } + } + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logWarning(msg: => String): Unit = { + log.warn(msg) + } + private var singleton: HostAlloc = new HostAlloc(-1) private def getSingleton: HostAlloc = synchronized { @@ -334,7 +364,7 @@ object HostAlloc extends Logging { buff.synchronized { val previous = Option(buff.getEventHandler) val handlerToSet = previous.map { p => - MultiEventHandler(p, handler) + new MultiEventHandler(p, handler) }.getOrElse { handler } @@ -354,7 +384,7 @@ object HostAlloc extends Logging { case oldA: MultiEventHandler => // From how the MultiEventHandler is set up we know that b cannot be one val newA = removeEventHandlerFrom(oldA, handler) - MultiEventHandler(newA, multiEventHandler.b) + new MultiEventHandler(newA, multiEventHandler.b) case _ => multiEventHandler } @@ -390,8 +420,8 @@ object HostAlloc extends Logging { } } - private case class MultiEventHandler(a: MemoryBuffer.EventHandler, - b: MemoryBuffer.EventHandler) + private class MultiEventHandler(val a: MemoryBuffer.EventHandler, + val b: MemoryBuffer.EventHandler) extends MemoryBuffer.EventHandler { override def onClosed(i: Int): Unit = { var t: Option[Throwable] = None @@ -434,7 +464,7 @@ object HostAlloc extends Logging { } override def remove(addr: Long, amount: Long): Unit = totalMem.add(-amount) } - private case class MemoryUsageDetail(addr: Long, amount: Long, callStack: String) { + private class MemoryUsageDetail(val addr: Long, val amount: Long, val callStack: String) { override def toString: String = s"$amount bytes behind address $addr at $callStack" } @@ -445,7 +475,7 @@ object HostAlloc extends Logging { s"${details.values.mkString("\n")}" override def add(addr: Long, amount: Long, callstack: String): Unit = - details.put(addr, MemoryUsageDetail(addr, amount, callstack)) + details.put(addr, new MemoryUsageDetail(addr, amount, callstack)) override def remove(addr: Long, amount: Long): Unit = details.remove(addr) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index ac4ea77c893..15a20deb01e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -28,7 +28,6 @@ import org.apache.arrow.memory.{ArrowBuf, ReferenceManager} import org.apache.arrow.vector.ValueVector import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -38,7 +37,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.sql.vectorized.rapids.AccessibleArrowColumnVector -object HostColumnarToGpu extends Logging { +object HostColumnarToGpu { // use reflection to get access to a private field in a class private def getClassFieldAccessible(className: String, fieldName: String) = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala index 5688ee30b30..25c37296c9b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -155,7 +155,7 @@ object JoinGatherer { outOfBoundsPolicyRight: OutOfBoundsPolicy): JoinGatherer = { val left = JoinGatherer(leftMap, leftData, outOfBoundsPolicyLeft) val right = JoinGatherer(rightMap, rightData, outOfBoundsPolicyRight) - MultiJoinGather(left, right) + new MultiJoinGather(left, right) } def getRowsInNextBatch(gatherer: JoinGatherer, targetSize: Long, @@ -227,7 +227,7 @@ object LazySpillableColumnarBatch { def spillOnly(wrapped: LazySpillableColumnarBatch): LazySpillableColumnarBatch = wrapped match { case alreadyGood: AllowSpillOnlyLazySpillableColumnarBatchImpl => alreadyGood - case anythingElse => AllowSpillOnlyLazySpillableColumnarBatchImpl(anythingElse) + case anythingElse => new AllowSpillOnlyLazySpillableColumnarBatchImpl(anythingElse) } } @@ -236,7 +236,7 @@ object LazySpillableColumnarBatch { * batch it is only spilled. This is used for cases, like with a streaming hash join * where the data itself needs to out live the JoinGatherer it is handed off to. */ -case class AllowSpillOnlyLazySpillableColumnarBatchImpl(wrapped: LazySpillableColumnarBatch) +class AllowSpillOnlyLazySpillableColumnarBatchImpl(val wrapped: LazySpillableColumnarBatch) extends LazySpillableColumnarBatch { override def getBatch: ColumnarBatch = wrapped.getBatch @@ -749,7 +749,8 @@ class JoinGathererSameTable( /** * Join Gatherer for a left table and a right table */ -case class MultiJoinGather(left: JoinGatherer, right: JoinGatherer) extends JoinGatherer { +class MultiJoinGather(val left: JoinGatherer, val right: JoinGatherer) + extends JoinGatherer with Serializable { assert(left.numRowsLeft == right.numRowsLeft, "all gatherers much have the same number of rows to gather") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala index 25ec92937e1..98119619e19 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala @@ -31,9 +31,9 @@ import com.nvidia.spark.rapids.jni.{CpuRetryOOM, CpuSplitAndRetryOOM, GpuRetryOO import com.nvidia.spark.rapids.spill.SpillFramework import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging -object RmmRapidsRetryIterator extends Logging { +object RmmRapidsRetryIterator { + private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$")) /** * withRetry for Iterator[T]. This helper calls a function `fn` as it takes @@ -100,7 +100,7 @@ object RmmRapidsRetryIterator extends Logging { splitPolicy: T => Seq[T]) (fn: T => K): Iterator[K] = { val attemptIter = new AutoCloseableAttemptSpliterator( - SingleItemAutoCloseableIteratorInternal(input), fn, splitPolicy) + new SingleItemAutoCloseableIteratorInternal(input), fn, splitPolicy) new RmmRapidsRetryAutoCloseableIterator(attemptIter) } @@ -130,7 +130,7 @@ object RmmRapidsRetryIterator extends Logging { input: T) (fn: T => K): K = { val attemptIter = new AutoCloseableAttemptSpliterator( - SingleItemAutoCloseableIteratorInternal(input), fn) + new SingleItemAutoCloseableIteratorInternal(input), fn) drainSingleWithVerification( new RmmRapidsRetryAutoCloseableIterator(attemptIter)) } @@ -160,9 +160,9 @@ object RmmRapidsRetryIterator extends Logging { def withRetryNoSplit[T <: AutoCloseable, K]( input: Seq[T]) (fn: Seq[T] => K): K = { - val wrapped = AutoCloseableSeqInternal(input) + val wrapped = new AutoCloseableSeqInternal(input) val attemptIter = new AutoCloseableAttemptSpliterator( - SingleItemAutoCloseableIteratorInternal(wrapped), fn) + new SingleItemAutoCloseableIteratorInternal(wrapped), fn) drainSingleWithVerification( new RmmRapidsRetryAutoCloseableIterator(attemptIter)) } @@ -346,7 +346,7 @@ object RmmRapidsRetryIterator extends Logging { * @param ts the Seq to wrap * @tparam T the type of the items in `ts` */ - private case class AutoCloseableSeqInternal[T <: AutoCloseable](ts: Seq[T]) + private class AutoCloseableSeqInternal[T <: AutoCloseable](ts: Seq[T]) extends Seq[T] with AutoCloseable { override def close(): Unit = { ts.foreach(_.safeClose()) @@ -375,7 +375,7 @@ object RmmRapidsRetryIterator extends Logging { * @param ts the AutoCloseable item to close if this iterator hasn't been drained * @tparam T the type of `ts`, must be AutoCloseable */ - private case class SingleItemAutoCloseableIteratorInternal[T <: AutoCloseable](ts: T) + private class SingleItemAutoCloseableIteratorInternal[T <: AutoCloseable](ts: T) extends Iterator[T] with AutoCloseable { private var wasCalledSuccessfully = false @@ -754,7 +754,7 @@ object RmmRapidsRetryIterator extends Logging { } else { splitReason = SplitReason.CPU_OOM } - logInfo("splitReason is set " + + log.info("splitReason is set " + s"to ${splitReason} after checking isRetryOrSplitAndRetry, related exception:", ex) } @@ -773,7 +773,7 @@ object RmmRapidsRetryIterator extends Logging { } } if (splitReason == SplitReason.GPU_OOM || splitReason == SplitReason.CPU_OOM) { - logInfo(s"splitReason is set to ${splitReason} after checking " + + log.info(s"splitReason is set to ${splitReason} after checking " + s"causedByRetryOrSplit, related exception:", ex) } } @@ -791,7 +791,7 @@ object RmmRapidsRetryIterator extends Logging { if (isOrCausedByColumnSizeOverflow(ex)) { // CUDF column size overflow? Attempt split-retry. splitReason = SplitReason.CUDF_OVERFLOW - logInfo(s"splitReason is set to ${splitReason} after checking " + + log.info(s"splitReason is set to ${splitReason} after checking " + s"isOrCausedByColumnSizeOverflow, related exception:", ex) } else { // we want to throw early here, since we got an exception @@ -879,7 +879,7 @@ object RmmRapidsRetryIterator extends Logging { s" minimum: ${target.minSize}") } } - Seq(AutoCloseableTargetSize(newTarget, target.minSize, target.dataSize)) + Seq(new AutoCloseableTargetSize(newTarget, target.minSize, target.dataSize)) } } @@ -937,8 +937,8 @@ object RmmRapidsRetryIterator extends Logging { private def logSpillFrameworkSummary(): Unit = { // print spillable status - logInfo(SpillFramework.getHostStoreSpillableSummary) - logInfo(SpillFramework.getDeviceStoreSpillableSummary) + log.info(SpillFramework.getHostStoreSpillableSummary) + log.info(SpillFramework.getDeviceStoreSpillableSummary) } // For GPU/CPU SplitAndRetryOOM, we are very interested what each task is doing when one @@ -956,12 +956,12 @@ object RmmRapidsRetryIterator extends Logging { } sb.append("\n\n") }) - logInfo(sb.toString()) + log.info(sb.toString()) } private def logMemoryBookkeeping(): Unit = { // use synchronized to keep neat // print host memory bookkeeping - logInfo(HostAlloc.getHostAllocBookkeepSummary()) + log.info(HostAlloc.getHostAllocBookkeepSummary()) // print device memory bookkeeping // TODO: uncomment this once we have device memory bookkeeping in spark-rapids-jni @@ -976,12 +976,6 @@ object RmmRapidsRetryIterator extends Logging { * `CpuSplitAndRetryOOM`, a split policy like `splitTargetSizeInHalfGpu` or * `splitTargetSizeInHalfCpu` can be used to retry the block with a smaller target size. */ -case class AutoCloseableTargetSize(targetSize: Long, minSize: Long, - dataSize: Long = 0) extends AutoCloseable { - def this(targetSize: Long, minSize: Long) = this(targetSize, minSize, 0) - override def close(): Unit = () -} - /** * This leverages a ThreadLocal of boolean to track if a task thread is currently * executing a retry. And the boolean state will be used by all the diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala index 8203cefdadd..ac25584833d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2025, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -413,7 +413,7 @@ class GpuSorter( } } -case class GpuSortOrderMeta( +class GpuSortOrderMeta( sortOrder: SortOrder, override val conf: RapidsConf, parentOpt: Option[RapidsMeta[_, _, _]], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index afab2a5ae09..3ec1b932a19 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -33,7 +33,6 @@ import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM import com.nvidia.spark.rapids.shims._ import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -53,7 +52,7 @@ class GpuProjectExecMeta( conf: RapidsConf, p: Option[RapidsMeta[_, _, _]], r: DataFromReplacementRule) extends SparkPlanMeta[ProjectExec](proj, conf, p, r) - with Logging { + with RapidsLocalLog { override def convertToGpu(): GpuExec = { // Force list to avoid recursive Java serialization of lazy list Seq implementation val gpuExprs = childExprs.map(_.convertToGpu().asInstanceOf[NamedExpression]).toList @@ -1012,7 +1011,7 @@ case class GpuProjectAstExec( * Input columns for tier 3: a, c, e, f, ref2, ref3 * Tier 3: (ref2 * e), (ref3 * f), (a + e), (c + f) */ - case class GpuTieredProject(exprTiers: Seq[Seq[GpuExpression]]) { + class GpuTieredProject(val exprTiers: Seq[Seq[GpuExpression]]) extends Serializable { /** * Inject metrics into all expressions across all tiers. @@ -1319,7 +1318,7 @@ object GpuFilter { } } -case class GpuFilterExecMeta( +class GpuFilterExecMeta( filter: FilterExec, override val conf: RapidsConf, parentMetaOpt: Option[RapidsMeta[_, _, _]], @@ -1400,7 +1399,7 @@ class GpuSampleExecMeta( conf: RapidsConf, p: Option[RapidsMeta[_, _, _]], r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r) - with Logging { + with RapidsLocalLog { override def convertToGpu(): GpuExec = { val gpuChild = childPlans.head.convertIfNeeded() if (conf.isFastSampleEnabled) { @@ -1569,7 +1568,7 @@ private[rapids] class GpuRangeIterator( step: Long, maxRowCountPerBatch: Long, taskContext: TaskContext, - opTime: GpuMetric) extends Iterator[ColumnarBatch] with Logging { + opTime: GpuMetric) extends Iterator[ColumnarBatch] with RapidsLocalLog { // This iterator is designed for GpuRangeExec, so it has the requirement for the inputs. assert((partitionEnd - partitionStart) % step == 0) @@ -1609,7 +1608,7 @@ private[rapids] class GpuRangeIterator( val remainingRows = (safePartitionEnd - start) / step // Start is inclusive so we need to produce at least one row val rowsExpected = Math.max(1, Math.min(remainingRows, maxRowCountPerBatch)) - val iter = withRetry(AutoCloseableLong(rowsExpected), reduceRowsNumberByHalf) { rows => + val iter = withRetry(new AutoCloseableLong(rowsExpected), reduceRowsNumberByHalf) { rows => withResource(Scalar.fromLong(start)) { startScalar => withResource(Scalar.fromLong(step)) { stepScalar => withResource( @@ -1658,12 +1657,12 @@ private[rapids] class GpuRangeIterator( throw new GpuSplitAndRetryOOM(s"GPU OutOfMemory: the number of rows generated is" + s" too small to be split ${rowsNumber.value}!") } - Seq(AutoCloseableLong(rowsNumber.value / 2)) + Seq(new AutoCloseableLong(rowsNumber.value / 2)) } } /** A bridge class between Long and AutoCloseable for retry */ - case class AutoCloseableLong(value: Long) extends AutoCloseable { + class AutoCloseableLong(val value: Long) extends AutoCloseable { override def close(): Unit = { /* Nothing to be closed */ } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/TreeNode.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/TreeNode.scala index 10a750ad618..9cb3ca9c254 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/TreeNode.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/TreeNode.scala @@ -19,6 +19,9 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, TernaryExpression, UnaryExpression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession +import org.apache.spark.sql.vectorized.ColumnarBatch trait ShimExpression extends Expression { override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = { @@ -69,3 +72,18 @@ trait ShimUnaryCommand extends UnaryCommand { legacyWithNewChildren(Seq(newChild)) } } + +trait ShimDataWritingCommand extends DataWritingCommand with ShimUnaryCommand { + def runColumnar(sparkSession: SparkSession, child: SparkPlan): Seq[ColumnarBatch] + + def runColumnarFromAny(sparkSession: AnyRef, child: SparkPlan): Seq[ColumnarBatch] = { + runColumnar(sparkSession.asInstanceOf[SparkSession], child) + } + + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[org.apache.spark.sql.Row] = { + com.nvidia.spark.rapids.Arm.withResource(runColumnar(sparkSession, child)) { batches => + assert(batches.isEmpty) + } + Seq.empty[org.apache.spark.sql.Row] + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuRunningWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuRunningWindowExec.scala index a87aea8b954..a1e7271e6b1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuRunningWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuRunningWindowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -242,10 +242,10 @@ case class GpuRunningWindowExec( } if (gpuPartitionSpec.isEmpty) { // If unpartitioned, batch on the order-by column. - BatchedByKey(gpuOrderSpec)(cpuOrderSpec) + new BatchedByKey(gpuOrderSpec)(cpuOrderSpec) } else { // If partitioned, batch on partition-columns + order-by columns. - BatchedByKey(gpuPartitionOrdering ++ gpuOrderSpec)(cpuPartitionOrdering ++ cpuOrderSpec) + new BatchedByKey(gpuPartitionOrdering ++ gpuOrderSpec)(cpuPartitionOrdering ++ cpuOrderSpec) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExec.scala index 8bf8a0e9dd3..7617a807260 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2025, NVIDIA CORPORATION. + * Copyright (c) 2020-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -157,7 +157,7 @@ case class GpuWindowExec( override def outputBatching: CoalesceGoal = if (gpuPartitionSpec.isEmpty) { RequireSingleBatch } else { - BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) + new BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) } override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 4365bc109a3..7d21ec91452 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -784,7 +784,7 @@ class GpuDynamicPartitionDataConcurrentWriter( (tt.sortTime, tt.sortOpTime) }.getOrElse((NoopMetric, NoopMetric)) - val sortIter = GpuOutOfCoreSortIterator(pendingCbsIter ++ iterator, + val sortIter = new GpuOutOfCoreSortIterator(pendingCbsIter ++ iterator, new GpuSorter(spec.sortOrder, spec.output, Map.empty[String, GpuMetric]), GpuSortExec.targetSize(spec.batchSize), sortOpTime, sortMetric, NoopMetric, NoopMetric) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index bd7c4147827..3726566c866 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -419,7 +419,11 @@ object GpuShuffleExchangeExecBase { }.toArray.toSeq val sorter = new GpuSorter(boundReferences, outputAttributes, metrics) rdd.mapPartitions { cbIter => - GpuSortEachBatchIterator(cbIter, sorter, false) + new GpuSortEachBatchIterator(cbIter, sorter, false, + opTime = NoopMetric, + sortTime = NoopMetric, + outputBatches = NoopMetric, + outputRows = NoopMetric) } } else { rdd diff --git a/sql-plugin/src/main/spark400db173/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark400db173/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 4f2994f1044..8d50f882110 100644 --- a/sql-plugin/src/main/spark400db173/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark400db173/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -43,12 +43,12 @@ object SparkShimImpl extends Spark400PlusDBShims { GpuOverrides.expr[CollationAwareMurmur3Hash]( "Collation-aware murmur3 hash operator", HashExprChecks.murmur3ProjectChecks, - Murmur3HashExprMeta.apply + ((expr, conf, parent, rule) => new Murmur3HashExprMeta(expr, conf, parent, rule)) ), GpuOverrides.expr[CollationAwareXxHash64]( "Collation-aware xxhash64 operator", HashExprChecks.xxhash64ProjectChecks, - XxHash64ExprMeta.apply + ((expr, conf, parent, rule) => new XxHash64ExprMeta(expr, conf, parent, rule)) ) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap super.getExprs ++ shimExprs diff --git a/sql-plugin/src/main/spark401/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark401/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 46ac248006c..a2a321c4c2d 100644 --- a/sql-plugin/src/main/spark401/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark401/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -31,12 +31,12 @@ object SparkShimImpl extends Spark400PlusCommonShims { GpuOverrides.expr[CollationAwareMurmur3Hash]( "Collation-aware murmur3 hash operator", HashExprChecks.murmur3ProjectChecks, - Murmur3HashExprMeta.apply + ((expr, conf, parent, rule) => new Murmur3HashExprMeta(expr, conf, parent, rule)) ), GpuOverrides.expr[CollationAwareXxHash64]( "Collation-aware xxhash64 operator", HashExprChecks.xxhash64ProjectChecks, - XxHash64ExprMeta.apply + ((expr, conf, parent, rule) => new XxHash64ExprMeta(expr, conf, parent, rule)) ) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap super.getExprs ++ shimExprs diff --git a/sql-plugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index f3f6c6bb675..dca0bcef176 100644 --- a/sql-plugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark411/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -36,12 +36,12 @@ object SparkShimImpl extends Spark400PlusCommonShims with RebaseShims { GpuOverrides.expr[CollationAwareMurmur3Hash]( "Collation-aware murmur3 hash operator", HashExprChecks.murmur3ProjectChecks, - Murmur3HashExprMeta.apply + ((expr, conf, parent, rule) => new Murmur3HashExprMeta(expr, conf, parent, rule)) ), GpuOverrides.expr[CollationAwareXxHash64]( "Collation-aware xxhash64 operator", HashExprChecks.xxhash64ProjectChecks, - XxHash64ExprMeta.apply + ((expr, conf, parent, rule) => new XxHash64ExprMeta(expr, conf, parent, rule)) ) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap // Include TimeAddShims for TimestampAddInterval support in 4.1.0 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSemaphoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSemaphoreSuite.scala index 076139ebcb0..a47f860c8d6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSemaphoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSemaphoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,15 +86,17 @@ class GpuSemaphoreSuite extends AnyFunSuite def assertAcquired(result: TryAcquireResult): Unit = result match { case SemaphoreAcquired => // NOOP - case AcquireFailed(_) => + case _: AcquireFailed => fail("The Semaphore was not acquired") } def assertNotAcquired(numExpectedWaiting: Int, result: TryAcquireResult): Unit = result match { case SemaphoreAcquired => fail("The Semaphore was acquired when we didn't expect it") - case AcquireFailed(numWaiting) => - assert(numWaiting == numExpectedWaiting, "The number of waiting tasks didn't match") + case failed: AcquireFailed => + assert( + failed.numWaitingTasks == numExpectedWaiting, + "The number of waiting tasks didn't match") } test("multi tryAcquire") { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSortRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSortRetrySuite.scala index 6e1542f7631..1e9be0c55d9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSortRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSortRetrySuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -176,10 +176,14 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar { } test("GPU each batch sort with GpuRetryOOM") { - val eachBatchIter = GpuSortEachBatchIterator( + val eachBatchIter = new GpuSortEachBatchIterator( batchIter(2), gpuSorter, - singleBatch = false) + false, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric) RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2, RmmSpark.OomInjectionType.GPU.ordinal, 0) while (eachBatchIter.hasNext) { @@ -201,10 +205,14 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar { test("GPU each batch sort throws GpuSplitAndRetryOOM") { val inputIter = batchIter(2) try { - val eachBatchIter = GpuSortEachBatchIterator( + val eachBatchIter = new GpuSortEachBatchIterator( inputIter, gpuSorter, - singleBatch = false) + false, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric) RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) assertThrows[GpuSplitAndRetryOOM] { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/ComplexCreatorSizeEstimationTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/ComplexCreatorSizeEstimationTest.scala index bb6437fb6fb..dd9b1ae2737 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/unit/ComplexCreatorSizeEstimationTest.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/ComplexCreatorSizeEstimationTest.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ class ComplexCreatorSizeEstimationTest extends GpuUnitTests { GpuColumnVector.getTotalDeviceMemoryUsed(proCb) } val estimatedSize = PreProjectSplitIterator.calcMinOutputSize(inCb, - GpuTieredProject(Seq(boundList))) + new GpuTieredProject(Seq(boundList))) assertResult(actualSize)(estimatedSize) } } diff --git a/tools/generated_files/411/operatorsScore.csv b/tools/generated_files/411/operatorsScore.csv index 87b8a403879..9bcbd035158 100644 --- a/tools/generated_files/411/operatorsScore.csv +++ b/tools/generated_files/411/operatorsScore.csv @@ -302,7 +302,6 @@ Subtract,4 Sum,4 Tan,4 Tanh,4 -TimestampAddInterval,4 ToDegrees,4 ToRadians,4 ToUTCTimestamp,4 diff --git a/tools/generated_files/411/supportedExprs.csv b/tools/generated_files/411/supportedExprs.csv index a710e653b43..b14305229f8 100644 --- a/tools/generated_files/411/supportedExprs.csv +++ b/tools/generated_files/411/supportedExprs.csv @@ -659,9 +659,6 @@ Tanh,S,`tanh`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA, Tanh,S,`tanh`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Tanh,S,`tanh`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA Tanh,S,`tanh`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TimestampAddInterval,S, ,None,project,start,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -TimestampAddInterval,S, ,None,project,interval,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,S,NA -TimestampAddInterval,S, ,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToDegrees,S,`degrees`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToDegrees,S,`degrees`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToRadians,S,`radians`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA