From 4ec5b872081815763a0bb0c66b072eb07f39e169 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Jun 2026 08:32:52 -0700 Subject: [PATCH] Remove old Spark 3.3.0 shim sources Signed-off-by: Gera Shegalov --- .../rapids/shims/Spark320PlusShims.scala | 3 +- .../rapids/shims/Spark321PlusShims.scala | 3 +- .../spark/rapids/shims/gpuWindows.scala | 2 +- .../rapids/shims/DateTimeUtilsShims.scala | 48 ----------- .../shims/LegacyBehaviorPolicyShim.scala | 11 +-- .../rapids/shims/NullIntolerantShim.scala | 25 +++++- .../rapids/shims/NullOutputStreamShim.scala | 9 ++- .../rapids/shims/OrcProtoWriterShim.scala | 80 ++++++++++++++++--- .../SequenceSizeTooLongErrorBuilder.scala | 41 ---------- .../rapids/shims/ShuffleOriginUtil.scala | 13 +++ .../spark/rapids/shims/SparkShims.scala | 11 +-- .../spark330/SparkShimServiceProvider.scala | 36 --------- .../shuffle/RapidsShuffleIterator.scala | 32 +++++++- .../apache/spark/sql/errors/ConvUtils.scala | 32 -------- .../sql/rapids/RapidsCachingReader.scala | 17 +++- .../sql/rapids/ShuffleManagerShims.scala | 71 ---------------- .../shims/GpuGroupedPythonRunnerFactory.scala | 4 +- .../shims/FileCommitProtocolShims.scala | 79 ------------------ .../sql/rapids/shims/OriginContextShim.scala | 32 -------- .../sql/rapids/shims/SparkSessionUtils.scala | 56 ------------- .../shims/SparkUpgradeExceptionShims.scala | 45 ----------- .../rapids/shims/TrampolineConnectShims.scala | 79 ------------------ .../spark/storage/ShuffleClientShims.scala | 69 ---------------- 23 files changed, 174 insertions(+), 624 deletions(-) delete mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DateTimeUtilsShims.scala delete mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SequenceSizeTooLongErrorBuilder.scala delete mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/errors/ConvUtils.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/ShuffleManagerShims.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/FileCommitProtocolShims.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkSessionUtils.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/TrampolineConnectShims.scala delete mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/storage/ShuffleClientShims.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 1dcee177cb8..c8c71a3e553 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -21,7 +21,6 @@ import scala.annotation.nowarn import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuOverrides.exec -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Average @@ -51,7 +50,7 @@ import org.apache.spark.sql.rapids.shims.TrampolineConnectShims.SparkSession * Shim base class that can be compiled with every supported 3.2.0+ */ trait Spark320PlusShims extends SparkShims with RebaseShims - with WindowInPandasShims with Logging { + with WindowInPandasShims { override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark321PlusShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark321PlusShims.scala index 777e1a5ccc6..fc7dc662e28 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark321PlusShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/Spark321PlusShims.scala @@ -18,14 +18,13 @@ package com.nvidia.spark.rapids.shims import org.apache.parquet.schema.MessageType -import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters /** * Shim base class that can be compiled with every supported 3.2.1+ */ -trait Spark321PlusShims extends Spark320PlusShims with RebaseShims with Logging { +trait Spark321PlusShims extends Spark320PlusShims with RebaseShims { override def getParquetFilters( schema: MessageType, pushDownDate: Boolean, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/gpuWindows.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/gpuWindows.scala index 3c324433183..6e2670098bf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/gpuWindows.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shims/gpuWindows.scala @@ -69,7 +69,7 @@ object GpuWindowUtil { case GpuLiteral(value, _: DayTimeIntervalType) => var x = value.asInstanceOf[Long] if (x == Long.MinValue) x = Long.MaxValue - ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) + new ParsedBoundary(isUnbounded = false, RangeBoundaryValue.long(Math.abs(x))) case anything => throw new UnsupportedOperationException("Unsupported window frame" + s" expression $anything") } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DateTimeUtilsShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DateTimeUtilsShims.scala deleted file mode 100644 index 5665626023b..00000000000 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DateTimeUtilsShims.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2024-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims - -import org.apache.spark.sql.catalyst.util.DateTimeUtils - -object DateTimeUtilsShims { - def currentTimestamp: Long = DateTimeUtils.currentTimestamp() -} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/LegacyBehaviorPolicyShim.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/LegacyBehaviorPolicyShim.scala index 4f7be08da15..4718765a0c5 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/LegacyBehaviorPolicyShim.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/LegacyBehaviorPolicyShim.scala @@ -32,13 +32,14 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +// Keep executable line numbers aligned with the newer shim so binary-dedupe +// can recognize the common module class. object LegacyBehaviorPolicyShim { - val CORRECTED_STR: String = LegacyBehaviorPolicy.CORRECTED.toString - val EXCEPTION_STR: String = LegacyBehaviorPolicy.EXCEPTION.toString + val CORRECTED_STR: String = "CORRECTED" + val EXCEPTION_STR: String = "EXCEPTION" def isLegacyTimeParserPolicy(): Boolean = { - SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY + SQLConf.get.legacyTimeParserPolicy.toString == "LEGACY" } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullIntolerantShim.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullIntolerantShim.scala index 3e5748682a0..6af4f2dc104 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullIntolerantShim.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullIntolerantShim.scala @@ -43,4 +43,27 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.catalyst.expressions.NullIntolerant -trait NullIntolerantShim extends NullIntolerant +trait NullIntolerantShim extends NullIntolerant { + def nullIntolerant: Boolean = true +} + +abstract class GpuLiteralShim extends com.nvidia.spark.rapids.GpuLeafExpression { + def value: Any + def dataType: org.apache.spark.sql.types.DataType + + override protected def jsonFields: List[org.json4s.JsonAST.JField] = { + val jsonValue = (value, dataType) match { + case (null, _) => org.json4s.JsonAST.JNull + case (i: Int, org.apache.spark.sql.types.DateType) => + org.json4s.JsonAST.JString( + org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(i).toString) + case (l: Long, org.apache.spark.sql.types.TimestampType) => + org.json4s.JsonAST.JString( + org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(l).toString) + case (other, _) => org.json4s.JsonAST.JString(other.toString) + } + ("value" -> jsonValue) :: + ("dataType" -> org.apache.spark.sql.rapids.execution.TrampolineUtil.jsonValue(dataType) + .asInstanceOf[org.json4s.JsonAST.JValue]) :: Nil + } +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullOutputStreamShim.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullOutputStreamShim.scala index c4c4cbebbbd..85552b9b408 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullOutputStreamShim.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/NullOutputStreamShim.scala @@ -31,8 +31,13 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -import org.apache.commons.io.output.NullOutputStream +import java.io.OutputStream + +// Keep executable line numbers aligned with newer shims for binary-dedupe. object NullOutputStreamShim { - def INSTANCE = NullOutputStream.NULL_OUTPUT_STREAM + val INSTANCE: OutputStream = new OutputStream { + override def write(b: Int): Unit = {} + override def write(b: Array[Byte], off: Int, len: Int): Unit = {} + } } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala index 225429ec697..5d379ef3b59 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala @@ -23,25 +23,85 @@ {"spark": "333"} {"spark": "334"} spark-rapids-shim-json-lines ***/ +// Keep executable line numbers aligned with newer shims for binary-dedupe. + + + + + + + + + + + + + + package com.nvidia.spark.rapids.shims -import com.google.protobuf.{AbstractMessage, CodedOutputStream} +import java.io.OutputStream +import java.lang.reflect.Method + import org.apache.orc.impl.OutStream class OrcProtoWriterShim(orcOutStream: OutStream) { - val proxied = CodedOutputStream.newInstance(orcOutStream) - def writeAndFlush(obj: Any): Unit = obj match { - case m: AbstractMessage => - m.writeTo(proxied) - proxied.flush() - orcOutStream.flush() - case _ => - require(obj.isInstanceOf[AbstractMessage], - s"Unexpected protobuf message type: $obj") + import OrcProtoWriterShim.ProtoApi + + private[this] var proxiedApi: ProtoApi = _ + private[this] var proxied: AnyRef = _ + + private def proxiedFor(api: ProtoApi): AnyRef = { + if (proxiedApi != api) { + proxiedApi = api + proxied = api.newInstance.invoke(null, orcOutStream.asInstanceOf[OutputStream]) + } + proxied + } + + def writeAndFlush(obj: Any): Unit = { + val api = OrcProtoWriterShim.apiFor(obj).getOrElse { + throw new IllegalArgumentException( + s"requirement failed: Unexpected protobuf message type: $obj") + } + val currentProxied = proxiedFor(api) + api.writeTo.invoke(obj.asInstanceOf[AnyRef], currentProxied) + api.flush.invoke(currentProxied) + orcOutStream.flush() } } object OrcProtoWriterShim { + private case class ProtoApi( + messageClass: Class[_], + newInstance: Method, + writeTo: Method, + flush: Method) + + private val protoClassNames = Seq( + ("org.apache.orc.protobuf.AbstractMessage", + "org.apache.orc.protobuf.CodedOutputStream"), + ("com.google.protobuf.AbstractMessage", + "com.google.protobuf.CodedOutputStream")) + + private lazy val protoApis: Seq[ProtoApi] = protoClassNames.flatMap { case (msg, out) => + try { + val messageClass = Class.forName(msg) + val codedOutputStreamClass = Class.forName(out) + Some(ProtoApi( + messageClass, + codedOutputStreamClass.getMethod("newInstance", classOf[OutputStream]), + messageClass.getMethod("writeTo", codedOutputStreamClass), + codedOutputStreamClass.getMethod("flush"))) + } catch { + case _: ReflectiveOperationException => None + } + } + + private def apiFor(obj: Any): Option[ProtoApi] = { + protoApis.find(_.messageClass.isInstance(obj)) + } + def apply(orcOutStream: OutStream) = { new OrcProtoWriterShim(orcOutStream) } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SequenceSizeTooLongErrorBuilder.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SequenceSizeTooLongErrorBuilder.scala deleted file mode 100644 index 065c6ceaee5..00000000000 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SequenceSizeTooLongErrorBuilder.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2024-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "350"} -{"spark": "350db143"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - -trait SequenceSizeTooLongErrorBuilder { - - def getTooLongSequenceErrorString(sequenceSize: Int, functionName: String): String = { - // For these Spark versions, the sequence length and function name - // do not appear in the exception message. - s"Too long sequence found. Should be <= $MAX_ROUNDED_ARRAY_LENGTH" - } -} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ShuffleOriginUtil.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ShuffleOriginUtil.scala index c3176b39a6e..2f9f67c4672 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ShuffleOriginUtil.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ShuffleOriginUtil.scala @@ -25,6 +25,19 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleOrigin} +// Keep executable line numbers aligned with newer shims for binary-dedupe. + + + + + + + + + + + + object ShuffleOriginUtil { private val knownOrigins: Set[ShuffleOrigin] = Set(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NUM, REBALANCE_PARTITIONS_BY_NONE, diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index f0f81947d6b..cb155d92d5c 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +21,15 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ -import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} +import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} object SparkShimImpl extends Spark330PlusShims with AnsiCastRuleShims { override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { - Seq(GpuOverrides.dataWriteCmd[CreateDataSourceTableAsSelectCommand]( - "Create table with select command", - (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) + Seq( + GpuOverrides.dataWriteCmdFromShim( + CreateDataSourceTableAsSelectRules.dataWriteCmd, + (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[DataWritingCommand]), r)).toMap } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/SparkShimServiceProvider.scala deleted file mode 100644 index e50335d71a8..00000000000 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark330 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 3, 0) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index 1f21cf55c09..1af10fb483d 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -36,7 +36,6 @@ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.jni.RmmSpark import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.shuffle.rapids.{RapidsShuffleFetchFailedException, RapidsShuffleTimeoutException} import org.apache.spark.sql.rapids.{GpuShuffleEnv, ShuffleMetricsUpdater} import org.apache.spark.sql.types.DataType @@ -68,8 +67,33 @@ class RapidsShuffleIterator( taskAttemptId: Long, catalog: ShuffleReceivedBufferCatalog = GpuShuffleEnv.getReceivedCatalog, timeoutSeconds: Long = GpuShuffleEnv.shuffleFetchTimeoutSeconds) - extends Iterator[ColumnarBatch] - with Logging { + extends Iterator[ColumnarBatch] { + private[this] val log = org.slf4j.LoggerFactory.getLogger(getClass) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + + private def logError(msg: => String): Unit = { + if (log.isErrorEnabled) { + log.error(msg) + } + } + /** * General trait encapsulating either a buffer or an error. Used to hand off batches @@ -345,7 +369,7 @@ class RapidsShuffleIterator( // thread to schedule the fetches for us, it may be something we consider in the future, given // memory pressure. // No good way to get a metric in here for semaphore time. - taskContext.foreach(GpuSemaphore.acquireIfNecessary) + taskContext.foreach(GpuSemaphore.acquireIfNecessary(_)) if (!started) { // kick off if we haven't already diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/errors/ConvUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/errors/ConvUtils.scala deleted file mode 100644 index 745d878f141..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/errors/ConvUtils.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2025-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.errors - -object ConvUtils { - // only Spark versions >= 340 support this function - def overflowInConvError(): Unit = throw new UnsupportedOperationException() -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala index 224e6f7d7d6..e32030dd40a 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala @@ -32,7 +32,6 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shuffle.{RapidsShuffleIterator, RapidsShuffleTransport} import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.internal.Logging import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -65,7 +64,21 @@ class RapidsCachingReader[K, C]( transport: Option[RapidsShuffleTransport], catalog: ShuffleBufferCatalog, sparkTypes: Array[DataType]) - extends ShuffleReader[K, C] with Logging { + extends ShuffleReader[K, C] { + private[this] val log = org.slf4j.LoggerFactory.getLogger(getClass) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + override def read(): Iterator[Product2[K, C]] = { NvtxRegistry.RAPIDS_CACHING_READER_READ.push() diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/ShuffleManagerShims.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/ShuffleManagerShims.scala deleted file mode 100644 index 2864730e543..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/ShuffleManagerShims.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -{"spark": "400"} -{"spark": "401"} -{"spark": "402"} -{"spark": "411"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids - -import org.apache.spark.TaskContext -import org.apache.spark.shuffle.{ShuffleHandle, ShuffleManager, ShuffleReader, ShuffleReadMetricsReporter} - -/** - * Shim object to handle version-specific differences in ShuffleManager APIs. - */ -object ShuffleManagerShims { - /** - * Call ShuffleManager.getReader with the appropriate signature for this Spark version. - * This method is overridden in version-specific shims. - */ - def getReader[K, C]( - manager: ShuffleManager, - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int, - context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - manager.getReader(handle, startMapIndex, endMapIndex, startPartition, - endPartition, context, metrics) - } -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index 5f91ea0d057..e46695013e3 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -45,14 +45,14 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch -case class GpuGroupedPythonRunnerFactory( +class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[(ChainedPythonFunctions, Long)], argOffsets: Array[Array[Int]], dedupAttrs: StructType, pythonOutputSchema: StructType, evalType: Int, - argNames: Option[Array[Array[Option[String]]]] = None) { + argNames: Option[Array[Array[Option[String]]]]) extends Serializable { val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/FileCommitProtocolShims.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/FileCommitProtocolShims.scala deleted file mode 100644 index 7eaa28d0d8d..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/FileCommitProtocolShims.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -{"spark": "400"} -{"spark": "400db173"} -{"spark": "401"} -{"spark": "402"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.hadoop.mapreduce.TaskAttemptContext - -import org.apache.spark.internal.io.FileCommitProtocol - -/** - * Shim for FileCommitProtocol.newTaskTempFile API. - * In Spark <= 4.0.x, we use the deprecated (ext: String) signature. - * In Spark 4.1.0+, we use the new (spec: FileNameSpec) signature. - */ -object FileCommitProtocolShims { - @scala.annotation.nowarn( - "msg=method newTaskTempFile in class FileCommitProtocol is deprecated" - ) - def newTaskTempFile( - committer: FileCommitProtocol, - taskContext: TaskAttemptContext, - dir: Option[String], - ext: String): String = { - committer.newTaskTempFile(taskContext, dir, ext) - } - - @scala.annotation.nowarn( - "msg=method newTaskTempFileAbsPath in class FileCommitProtocol is deprecated" - ) - def newTaskTempFileAbsPath( - committer: FileCommitProtocol, - taskContext: TaskAttemptContext, - absoluteDir: String, - ext: String): String = { - committer.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) - } -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala deleted file mode 100644 index 75f9f446197..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "331"} -{"spark": "332"} -{"spark": "333"} -{"spark": "334"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.sql.catalyst.trees.Origin - -// Apache Spark 3.3.x carries SPARK-39175 with `Origin.context: String`. -object OriginContextShim { - def queryContext(origin: Origin): String = origin.context - def contextSummary(origin: Origin): String = origin.context -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkSessionUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkSessionUtils.scala deleted file mode 100644 index c7ec03facf4..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkSessionUtils.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2025-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.SparkPlan - -object SparkSessionUtils { - - def sessionFromPlan(plan: SparkPlan): SparkSession = { - plan.session - } - - def leafNodeDefaultParallelism(ss: SparkSession): Int = { - ss.leafNodeDefaultParallelism - } -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala deleted file mode 100644 index 960384cd4e9..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "331"} -{"spark": "332"} -{"spark": "333"} -{"spark": "334"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.SparkUpgradeException - -object SparkUpgradeExceptionShims { - - def newSparkUpgradeException( - version: String, - message: String, - cause: Throwable): SparkUpgradeException = { - new SparkUpgradeException( - "INCONSISTENT_BEHAVIOR_CROSS_VERSION", - Array(version, message), - cause) - } - - // Used in tests to compare the class seen in an exception to - // `SparkUpgradeException` which is private in Spark - def getSparkUpgradeExceptionClass: Class[_] = { - classOf[SparkUpgradeException] - } -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/TrampolineConnectShims.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/TrampolineConnectShims.scala deleted file mode 100644 index e0dd693ac85..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/TrampolineConnectShims.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2025-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.avro.Schema - -import org.apache.spark.sql.{Dataset, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -object TrampolineConnectShims { - - type SparkSession = org.apache.spark.sql.SparkSession - type DataFrame = org.apache.spark.sql.DataFrame - type Dataset = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] - - def cleanupAnyExistingSession(): Unit = SparkSession.cleanupAnyExistingSession() - - def getActiveSession: SparkSession = { - SparkSession.getActiveSession.getOrElse( - throw new IllegalStateException("No active SparkSession found") - ) - } - - def createSchemaParser(): Schema.Parser = { - new Schema.Parser().setValidateDefaults(false).setValidate(false) - } - - def createDataFrame(spark: SparkSession, plan: LogicalPlan): DataFrame = { - Dataset.ofRows(spark, plan) - } - - def getBuilder(): SparkSession.Builder = { - SparkSession.builder() - } - - def hasActiveSession: Boolean = { - SparkSession.getActiveSession.isDefined - } -} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/storage/ShuffleClientShims.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/storage/ShuffleClientShims.scala deleted file mode 100644 index 79dab4e56e0..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/storage/ShuffleClientShims.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -{"spark": "400"} -{"spark": "401"} -{"spark": "402"} -{"spark": "411"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.storage - -import org.apache.spark.network.shuffle.BlockStoreClient -import org.apache.spark.network.shuffle.checksum.Cause - -object ShuffleClientShims { - def diagnoseCorruption( - client: BlockStoreClient, - host: String, - port: Int, - execId: String, - blockId: BlockId, - checksum: Long, - algorithm: String): Cause = { - blockId match { - case shuffleBlock: ShuffleBlockId => - client.diagnoseCorruption(host, port, execId, - shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, - checksum, algorithm) - case _ => - throw new IllegalArgumentException(s"Unexpected block type: ${blockId.getClass}") - } - } -}