From bb08a8933417e6a5e331e45d1772efae082804fa Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Jun 2026 08:33:24 -0700 Subject: [PATCH] Remove old Spark 3.3.1 through 3.4 shim sources Signed-off-by: Gera Shegalov --- .../rapids/shims/Spark331PlusNonDBShims.scala | 25 +------ .../spark/rapids/shims/SparkShims.scala | 11 ++-- .../spark331/SparkShimServiceProvider.scala | 36 ---------- .../spark332/SparkShimServiceProvider.scala | 36 ---------- .../rapids/shims/Spark332PlusDBShims.scala | 20 ++---- .../rapids/shims/WriteFilesExecRule.scala} | 36 +++++----- .../spark332db/SparkShimServiceProvider.scala | 37 ----------- .../sql/hive/rapids/shims/HiveFileUtil.scala | 10 ++- .../shims/SparkDateTimeExceptionShims.scala | 58 ----------------- .../spark333/SparkShimServiceProvider.scala | 36 ---------- .../spark334/SparkShimServiceProvider.scala | 36 ---------- ...eSizeTooLongUnsuccessfulErrorBuilder.scala | 42 ------------ .../rapids/shims/OrcProtoWriterShim.scala | 65 ++++++++++++++++--- .../rapids/shims/Spark340PlusNonDBShims.scala | 19 ++---- .../spark340/SparkShimServiceProvider.scala | 36 ---------- .../shuffle/RapidsShuffleIterator.scala | 30 ++++++++- .../apache/spark/sql/errors/ConvUtils.scala | 44 ------------- .../sql/rapids/RapidsCachingReader.scala | 17 ++++- .../sql/rapids/shims/OriginContextShim.scala | 50 -------------- .../spark341/SparkShimServiceProvider.scala | 36 ---------- .../shims/GpuWindowGroupLimitExec.scala | 4 +- .../spark341db/SparkShimServiceProvider.scala | 37 ----------- .../python/shims/GpuArrowPythonRunner.scala | 17 +++++ .../shims/GpuCoGroupedArrowPythonRunner.scala | 16 +++++ .../shims/GpuGroupedPythonRunnerFactory.scala | 4 +- .../spark342/SparkShimServiceProvider.scala | 36 ---------- .../spark343/SparkShimServiceProvider.scala | 36 ---------- .../spark344/SparkShimServiceProvider.scala | 36 ---------- 28 files changed, 177 insertions(+), 689 deletions(-) delete mode 100644 sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/spark331/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark332/scala/com/nvidia/spark/rapids/shims/spark332/SparkShimServiceProvider.scala rename sql-plugin/src/main/spark332db/scala/{org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala => com/nvidia/spark/rapids/shims/WriteFilesExecRule.scala} (53%) delete mode 100644 sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/spark332db/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkDateTimeExceptionShims.scala delete mode 100644 sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark334/scala/com/nvidia/spark/rapids/shims/spark334/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark334/scala/org/apache/spark/sql/rapids/shims/SequenceSizeTooLongUnsuccessfulErrorBuilder.scala delete mode 100644 sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/spark340/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark340/scala/org/apache/spark/sql/errors/ConvUtils.scala delete mode 100644 sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala delete mode 100644 sql-plugin/src/main/spark341/scala/com/nvidia/spark/rapids/shims/spark341/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/spark341db/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark342/scala/com/nvidia/spark/rapids/shims/spark342/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark343/scala/com/nvidia/spark/rapids/shims/spark343/SparkShimServiceProvider.scala delete mode 100644 sql-plugin/src/main/spark344/scala/com/nvidia/spark/rapids/shims/spark344/SparkShimServiceProvider.scala diff --git a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala index a65872ff22e..c36d13c15ed 100644 --- a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala +++ b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala @@ -40,31 +40,12 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -import com.nvidia.spark.rapids.{ExprChecks, ExprRule, GpuCast, GpuExpression, GpuOverrides, TypeSig, UnaryExprMeta} +import com.nvidia.spark.rapids.ExprRule -import org.apache.spark.sql.catalyst.expressions.{CheckOverflowInTableInsert, Expression} -import org.apache.spark.sql.rapids.GpuCheckOverflowInTableInsert +import org.apache.spark.sql.catalyst.expressions.Expression trait Spark331PlusNonDBShims extends Spark330PlusNonDBShims { override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { - val map: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( - // Add expression CheckOverflowInTableInsert starting Spark-3.3.1+ - // Accepts all types as input as the child Cast does the type checking and the calculations. - GpuOverrides.expr[CheckOverflowInTableInsert]( - "Casting a numeric value as another numeric type in store assignment", - ExprChecks.unaryProjectInputMatchesOutput( - TypeSig.all, - TypeSig.all), - (t, conf, p, r) => new UnaryExprMeta[CheckOverflowInTableInsert](t, conf, p, r) { - override def convertToGpu(child: Expression): GpuExpression = { - child match { - case c: GpuCast => GpuCheckOverflowInTableInsert(c, t.columnName) - case _ => - throw new IllegalStateException("Expression child is not of Type GpuCast") - } - } - }) - ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap - super.getExprs ++ map + super.getExprs ++ CheckOverflowInTableInsertShims.exprs } } diff --git a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 12d59e845d6..f90cc53fdeb 100644 --- a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,14 +24,15 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ -import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} +import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} object SparkShimImpl extends Spark331PlusNonDBShims with AnsiCastRuleShims { override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { - Seq(GpuOverrides.dataWriteCmd[CreateDataSourceTableAsSelectCommand]( - "Create table with select command", - (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) + Seq( + GpuOverrides.dataWriteCmdFromShim( + CreateDataSourceTableAsSelectRules.dataWriteCmd, + (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[DataWritingCommand]), r)).toMap } diff --git a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/spark331/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/spark331/SparkShimServiceProvider.scala deleted file mode 100644 index db631bdfb63..00000000000 --- a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/spark331/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "331"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark331 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 3, 1) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark332/scala/com/nvidia/spark/rapids/shims/spark332/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark332/scala/com/nvidia/spark/rapids/shims/spark332/SparkShimServiceProvider.scala deleted file mode 100644 index 06be70cb21b..00000000000 --- a/sql-plugin/src/main/spark332/scala/com/nvidia/spark/rapids/shims/spark332/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "332"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark332 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 3, 2) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala index ad235624b6b..83b40cfa7f3 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala @@ -26,8 +26,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} trait Spark332PlusDBShims extends Spark330PlusDBShims { // AnsiCast is removed from Spark3.4.0 @@ -47,19 +46,8 @@ trait Spark332PlusDBShims extends Spark330PlusDBShims { super.getExprs ++ shimExprs } - private val shimExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq( - GpuOverrides.exec[WriteFilesExec]( - "v1 write files", - // WriteFilesExec always has patterns: - // InsertIntoHadoopFsRelationCommand(WriteFilesExec) or InsertIntoHiveTable(WriteFilesExec) - // The parent node of `WriteFilesExec` will check the types, here just let type check pass - ExecChecks(TypeSig.all, TypeSig.all), - (write, conf, p, r) => new GpuWriteFilesMeta(write, conf, p, r) - ) - ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap - override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = - super.getExecs ++ shimExecs + super.getExecs ++ WriteFilesExecRule.execs override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { @@ -71,8 +59,8 @@ trait Spark332PlusDBShims extends Spark330PlusDBShims { override def getRunnableCmds: Map[Class[_ <: RunnableCommand], RunnableCommandRule[_ <: RunnableCommand]] = { Seq( - GpuOverrides.runnableCmd[CreateDataSourceTableAsSelectCommand]( - "Write to a data source", + GpuOverrides.runnableCmdFromShim( + CreateDataSourceTableAsSelectRules.runnableCmd, (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap } diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/WriteFilesExecRule.scala similarity index 53% rename from sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala rename to sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/WriteFilesExecRule.scala index acda623f772..cc331765954 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkUpgradeExceptionShims.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/WriteFilesExecRule.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2026, NVIDIA CORPORATION. + * Copyright (c) 2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,25 +38,25 @@ {"spark": "402"} {"spark": "411"} spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims +package com.nvidia.spark.rapids.shims -import org.apache.spark.SparkUpgradeException +import com.nvidia.spark.rapids.{ExecChecks, ExecRule, GpuOverrides, TypeSig} -object SparkUpgradeExceptionShims { +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.GpuWriteFilesMeta - def newSparkUpgradeException( - version: String, - message: String, - cause: Throwable): SparkUpgradeException = { - new SparkUpgradeException( - "INCONSISTENT_BEHAVIOR_CROSS_VERSION", - Map(version -> message), - cause) - } - - // Used in tests to compare the class seen in an exception to - // `SparkUpgradeException` which is private in Spark - def getSparkUpgradeExceptionClass: Class[_] = { - classOf[SparkUpgradeException] +object WriteFilesExecRule { + val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + Seq( + GpuOverrides.execFromShim( + WriteFilesExecShims.exec, + // WriteFilesExec always has patterns: + // InsertIntoHadoopFsRelationCommand(WriteFilesExec) or + // InsertIntoHiveTable(WriteFilesExec) + // The parent node of `WriteFilesExec` will check the types, here just let type check pass. + ExecChecks(TypeSig.all, TypeSig.all), + (write, conf, p, r) => new GpuWriteFilesMeta(write, conf, p, r) + ) + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } } diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/spark332db/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/spark332db/SparkShimServiceProvider.scala deleted file mode 100644 index be448c2d4ba..00000000000 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/spark332db/SparkShimServiceProvider.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "332db"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark332db - -import com.nvidia.spark.rapids._ - -object SparkShimServiceProvider { - // DB version should conform to "major.minor" and has no patch version. - // Refer to VersionUtils.getVersionForJni - val VERSION = DatabricksShimVersion(3, 3, 2, "12.2") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: ShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - DatabricksShimServiceProvider.matchesVersion("12.2.x") - } -} diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/hive/rapids/shims/HiveFileUtil.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/hive/rapids/shims/HiveFileUtil.scala index 307f0e14665..37e1c8b86c2 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/hive/rapids/shims/HiveFileUtil.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/hive/rapids/shims/HiveFileUtil.scala @@ -45,9 +45,15 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging +object HiveFileUtil { + private val log = org.slf4j.LoggerFactory.getLogger(HiveFileUtil.getClass) + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } -object HiveFileUtil extends Logging { // prior to Spark 3.4.0, this method was accessible via the SaveAsHiveFile trait, but // was removed in https://github.com/apache/spark/pull/39277 diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkDateTimeExceptionShims.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkDateTimeExceptionShims.scala deleted file mode 100644 index caa7c84dcd7..00000000000 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/SparkDateTimeExceptionShims.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2022-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "332db"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -{"spark": "400"} -{"spark": "400db173"} -{"spark": "401"} -{"spark": "402"} -{"spark": "411"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.{QueryContext, SparkDateTimeException} - -object SparkDateTimeExceptionShims { - - def newSparkDateTimeException( - errorClass: String, - messageParameters: Map[String, String], - context: Array[QueryContext], - summary: String): SparkDateTimeException = { - new SparkDateTimeException( - errorClass, - messageParameters, - context, - summary) - } -} diff --git a/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala deleted file mode 100644 index f329546de6a..00000000000 --- a/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "333"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark333 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 3, 3) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark334/scala/com/nvidia/spark/rapids/shims/spark334/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark334/scala/com/nvidia/spark/rapids/shims/spark334/SparkShimServiceProvider.scala deleted file mode 100644 index 9742399c693..00000000000 --- a/sql-plugin/src/main/spark334/scala/com/nvidia/spark/rapids/shims/spark334/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "334"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark334 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 3, 4) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark334/scala/org/apache/spark/sql/rapids/shims/SequenceSizeTooLongUnsuccessfulErrorBuilder.scala b/sql-plugin/src/main/spark334/scala/org/apache/spark/sql/rapids/shims/SequenceSizeTooLongUnsuccessfulErrorBuilder.scala deleted file mode 100644 index b7b01b388b9..00000000000 --- a/sql-plugin/src/main/spark334/scala/org/apache/spark/sql/rapids/shims/SequenceSizeTooLongUnsuccessfulErrorBuilder.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "334"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - -trait SequenceSizeTooLongUnsuccessfulErrorBuilder { - def getTooLongSequenceErrorString(sequenceSize: Int, functionName: String): String = { - // The errant function's name does not feature in the exception message - // prior to Spark 4.0. Neither does the attempted allocation size. - "Unsuccessful try to create array with elements exceeding the array " + - s"size limit $MAX_ROUNDED_ARRAY_LENGTH" - } -} diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala index 047f25fd910..0e812573805 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/OrcProtoWriterShim.scala @@ -40,23 +40,68 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims +import java.io.OutputStream +import java.lang.reflect.Method + import org.apache.orc.impl.OutStream -import org.apache.orc.protobuf.{AbstractMessage, CodedOutputStream} class OrcProtoWriterShim(orcOutStream: OutStream) { - val proxied = CodedOutputStream.newInstance(orcOutStream) - def writeAndFlush(obj: Any): Unit = obj match { - case m: AbstractMessage => - m.writeTo(proxied) - proxied.flush() - orcOutStream.flush() - case _ => - require(obj.isInstanceOf[AbstractMessage], - s"Unexpected protobuf message type: $obj") + import OrcProtoWriterShim.ProtoApi + + private[this] var proxiedApi: ProtoApi = _ + private[this] var proxied: AnyRef = _ + + private def proxiedFor(api: ProtoApi): AnyRef = { + if (proxiedApi != api) { + proxiedApi = api + proxied = api.newInstance.invoke(null, orcOutStream.asInstanceOf[OutputStream]) + } + proxied + } + + def writeAndFlush(obj: Any): Unit = { + val api = OrcProtoWriterShim.apiFor(obj).getOrElse { + throw new IllegalArgumentException( + s"requirement failed: Unexpected protobuf message type: $obj") + } + val currentProxied = proxiedFor(api) + api.writeTo.invoke(obj.asInstanceOf[AnyRef], currentProxied) + api.flush.invoke(currentProxied) + orcOutStream.flush() } } object OrcProtoWriterShim { + private case class ProtoApi( + messageClass: Class[_], + newInstance: Method, + writeTo: Method, + flush: Method) + + private val protoClassNames = Seq( + ("org.apache.orc.protobuf.AbstractMessage", + "org.apache.orc.protobuf.CodedOutputStream"), + ("com.google.protobuf.AbstractMessage", + "com.google.protobuf.CodedOutputStream")) + + private lazy val protoApis: Seq[ProtoApi] = protoClassNames.flatMap { case (msg, out) => + try { + val messageClass = Class.forName(msg) + val codedOutputStreamClass = Class.forName(out) + Some(ProtoApi( + messageClass, + codedOutputStreamClass.getMethod("newInstance", classOf[OutputStream]), + messageClass.getMethod("writeTo", codedOutputStreamClass), + codedOutputStreamClass.getMethod("flush"))) + } catch { + case _: ReflectiveOperationException => None + } + } + + private def apiFor(obj: Any): Option[ProtoApi] = { + protoApis.find(_.messageClass.isInstance(obj)) + } + def apply(orcOutStream: OutStream) = { new OrcProtoWriterShim(orcOutStream) } diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala index 88c62eea41b..2276a9685db 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala @@ -43,8 +43,7 @@ import org.apache.spark.rapids.shims.GpuShuffleExchangeExec import org.apache.spark.sql.catalyst.expressions.{Empty2Null, Expression, KnownNullable, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{CollectLimitExec, GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec} -import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} -import org.apache.spark.sql.execution.datasources.{GpuWriteFilesMeta, WriteFilesExec} +import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS import org.apache.spark.sql.rapids.GpuElementAtMeta import org.apache.spark.sql.rapids.GpuV1WriteUtils.GpuEmpty2Null @@ -121,19 +120,11 @@ trait Spark340PlusNonDBShims extends Spark331PlusNonDBShims { } ).disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + - "GPU to CPU"), - GpuOverrides.exec[WriteFilesExec]( - "v1 write files", - // WriteFilesExec always has patterns: - // InsertIntoHadoopFsRelationCommand(WriteFilesExec) or InsertIntoHiveTable(WriteFilesExec) - // The parent node of `WriteFilesExec` will check the types, here just let type check pass - ExecChecks(TypeSig.all, TypeSig.all), - (write, conf, p, r) => new GpuWriteFilesMeta(write, conf, p, r) - ) + "GPU to CPU") ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = - super.getExecs ++ shimExecs + super.getExecs ++ shimExecs ++ WriteFilesExecRule.execs // AnsiCast is removed from Spark3.4.0 override def ansiCastRule: ExprRule[_ <: Expression] = null @@ -173,8 +164,8 @@ trait Spark340PlusNonDBShims extends Spark331PlusNonDBShims { override def getRunnableCmds: Map[Class[_ <: RunnableCommand], RunnableCommandRule[_ <: RunnableCommand]] = { Seq( - GpuOverrides.runnableCmd[CreateDataSourceTableAsSelectCommand]( - "Write to a data source", + GpuOverrides.runnableCmdFromShim( + CreateDataSourceTableAsSelectRules.runnableCmd, (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap } diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/spark340/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/spark340/SparkShimServiceProvider.scala deleted file mode 100644 index 23581ba9a28..00000000000 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/spark340/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "340"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark340 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 4, 0) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - override def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index 4bf7215e2a0..b9966a7840a 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -49,7 +49,6 @@ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.jni.RmmSpark import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging import org.apache.spark.shuffle.rapids.{RapidsShuffleFetchFailedException, RapidsShuffleTimeoutException} import org.apache.spark.sql.rapids.{GpuShuffleEnv, ShuffleMetricsUpdater} import org.apache.spark.sql.types.DataType @@ -81,8 +80,33 @@ class RapidsShuffleIterator( taskAttemptId: Long, catalog: ShuffleReceivedBufferCatalog = GpuShuffleEnv.getReceivedCatalog, timeoutSeconds: Long = GpuShuffleEnv.shuffleFetchTimeoutSeconds) - extends Iterator[ColumnarBatch] - with Logging { + extends Iterator[ColumnarBatch] { + private[this] val log = org.slf4j.LoggerFactory.getLogger(getClass) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + + private def logWarning(msg: => String): Unit = { + if (log.isWarnEnabled) { + log.warn(msg) + } + } + + private def logError(msg: => String): Unit = { + if (log.isErrorEnabled) { + log.error(msg) + } + } + /** * General trait encapsulating either a buffer or an error. Used to hand off batches diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/errors/ConvUtils.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/errors/ConvUtils.scala deleted file mode 100644 index d9a669771a4..00000000000 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/errors/ConvUtils.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2025-2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -{"spark": "400"} -{"spark": "400db173"} -{"spark": "401"} -{"spark": "402"} -{"spark": "411"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.errors - -object ConvUtils { - def overflowInConvError(): Unit = throw QueryExecutionErrors.overflowInConvError(null) -} diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala index 913857c871f..97945e42523 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala @@ -46,7 +46,6 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shuffle.{RapidsShuffleIterator, RapidsShuffleTransport} import org.apache.spark.{InterruptibleIterator, TaskContext} -import org.apache.spark.internal.Logging import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -79,7 +78,21 @@ class RapidsCachingReader[K, C]( transport: Option[RapidsShuffleTransport], catalog: ShuffleBufferCatalog, sparkTypes: Array[DataType]) - extends ShuffleReader[K, C] with Logging { + extends ShuffleReader[K, C] { + private[this] val log = org.slf4j.LoggerFactory.getLogger(getClass) + + private def logInfo(msg: => String): Unit = { + if (log.isInfoEnabled) { + log.info(msg) + } + } + + private def logDebug(msg: => String): Unit = { + if (log.isDebugEnabled) { + log.debug(msg) + } + } + override def read(): Iterator[Product2[K, C]] = { NvtxRegistry.RAPIDS_CACHING_READER_READ.push() diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala deleted file mode 100644 index 6de02a69703..00000000000 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/OriginContextShim.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -{"spark": "343"} -{"spark": "344"} -{"spark": "350"} -{"spark": "350db143"} -{"spark": "351"} -{"spark": "352"} -{"spark": "353"} -{"spark": "354"} -{"spark": "355"} -{"spark": "356"} -{"spark": "357"} -{"spark": "358"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import org.apache.spark.sql.catalyst.trees.{Origin, SQLQueryContext} - -// Apache Spark 3.4.x / 3.5.x typed `Origin.context` as `SQLQueryContext` -// directly, while some Databricks runtimes expose the wider `QueryContext`. -object OriginContextShim { - def queryContext(origin: Origin): SQLQueryContext = origin.context match { - case ctx: SQLQueryContext => ctx - case _ => null - } - def contextSummary(origin: Origin): String = origin.context match { - case null => "" - case ctx => ctx.summary - } -} diff --git a/sql-plugin/src/main/spark341/scala/com/nvidia/spark/rapids/shims/spark341/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark341/scala/com/nvidia/spark/rapids/shims/spark341/SparkShimServiceProvider.scala deleted file mode 100644 index 38f9fd0307f..00000000000 --- a/sql-plugin/src/main/spark341/scala/com/nvidia/spark/rapids/shims/spark341/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "341"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark341 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 4, 1) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - override def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuWindowGroupLimitExec.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuWindowGroupLimitExec.scala index e385fee8a63..aa09c29cd2a 100644 --- a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuWindowGroupLimitExec.scala +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuWindowGroupLimitExec.scala @@ -41,7 +41,6 @@ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuRank, GpuRowNumber} -import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, DenseRank, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, NamedExpression, Rank, RowNumber, SortOrder, WindowExpression, WindowSpecDefinition} @@ -260,8 +259,7 @@ class GpuWindowGroupLimitingIterator(input: Iterator[ColumnarBatch], limit: Int, numOutputBatches: GpuMetric, numOutputRows: GpuMetric) - extends Iterator[ColumnarBatch] - with Logging { + extends Iterator[ColumnarBatch] { override def hasNext: Boolean = input.hasNext diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/spark341db/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/spark341db/SparkShimServiceProvider.scala deleted file mode 100644 index 72ed2c7c067..00000000000 --- a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/spark341db/SparkShimServiceProvider.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "341db"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark341db - -import com.nvidia.spark.rapids._ - -object SparkShimServiceProvider { - // DB version should conform to "major.minor" and has no patch version. - // Refer to VersionUtils.getVersionForJni - val VERSION = DatabricksShimVersion(3, 4, 1, "13.3") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: ShimVersion = SparkShimServiceProvider.VERSION - - def matchesVersion(version: String): Boolean = { - DatabricksShimServiceProvider.matchesVersion("13.3.x") - } -} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala index 6ffa0abab53..77ee3005330 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuArrowPythonRunner.scala @@ -35,6 +35,23 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +// Keep executable line numbers aligned with pre-Spark-4 shims for binary-dedupe. + + + + + + + + + + + + + + + + /** * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. */ diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala index e665435eb25..dba5900e25c 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuCoGroupedArrowPythonRunner.scala @@ -36,6 +36,22 @@ import org.apache.spark.sql.rapids.execution.python.{GpuArrowWriter, GpuPythonRu import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +// Keep executable line numbers aligned with pre-Spark-4 shims for binary-dedupe. + + + + + + + + + + + + + + + /** * Python UDF Runner for cogrouped UDFs, designed for `GpuFlatMapCoGroupsInPandasExec` only. * diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index ea75745de6c..5b802ae838b 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -26,14 +26,14 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -case class GpuGroupedPythonRunnerFactory( +class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[(ChainedPythonFunctions, Long)], argOffsets: Array[Array[Int]], dedupAttrs: StructType, pythonOutputSchema: StructType, evalType: Int, - argNames: Option[Array[Array[Option[String]]]] = None) { + argNames: Option[Array[Array[Option[String]]]]) extends Serializable { // Configs from DB runtime val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled diff --git a/sql-plugin/src/main/spark342/scala/com/nvidia/spark/rapids/shims/spark342/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark342/scala/com/nvidia/spark/rapids/shims/spark342/SparkShimServiceProvider.scala deleted file mode 100644 index 6b00a7a762e..00000000000 --- a/sql-plugin/src/main/spark342/scala/com/nvidia/spark/rapids/shims/spark342/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "342"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark342 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 4, 2) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - override def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark343/scala/com/nvidia/spark/rapids/shims/spark343/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark343/scala/com/nvidia/spark/rapids/shims/spark343/SparkShimServiceProvider.scala deleted file mode 100644 index ff35d06a9c7..00000000000 --- a/sql-plugin/src/main/spark343/scala/com/nvidia/spark/rapids/shims/spark343/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "343"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark343 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 4, 3) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - override def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -} diff --git a/sql-plugin/src/main/spark344/scala/com/nvidia/spark/rapids/shims/spark344/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark344/scala/com/nvidia/spark/rapids/shims/spark344/SparkShimServiceProvider.scala deleted file mode 100644 index 80f042e0ee7..00000000000 --- a/sql-plugin/src/main/spark344/scala/com/nvidia/spark/rapids/shims/spark344/SparkShimServiceProvider.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "344"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims.spark344 - -import com.nvidia.spark.rapids.SparkShimVersion - -object SparkShimServiceProvider { - val VERSION = SparkShimVersion(3, 4, 4) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION - - override def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } -}