diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 26b3de7f50890..d3ea9efda0662 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8892,6 +8892,11 @@ "No enough memory for aggregation" ] }, + "_LEGACY_ERROR_TEMP_3303" : { + "message" : [ + "The command is unsafe to run after spark session stopped" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 2ec85a38723cb..a052b16a4a368 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2901,4 +2901,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE ) ) } + + def unsafeCommandWhenSparkSessionStopped(command: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_3303", + messageParameters = Map("command" -> command)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 8a795f0748811..537ad9c33d1bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -112,12 +112,28 @@ case class InsertIntoHadoopFsRelationCommand( } val jobId = java.util.UUID.randomUUID().toString + + val activeSession = Option(sparkSession).filter(_.isUsable) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) + def isSessionActiveAndUnchanged: Boolean = { + SparkSession.getActiveSession.zip(activeSession).exists { + case (sessionBefore, sessionAfter) => sessionBefore eq sessionAfter + } + } + + // SPARK-48458: we need to ensure we have the same spark session that had been used + // during dynamicPartitionOverwrite initialization. Otherwise, there is a data loss risk + // if a fallback to the default static mode happens. + if (!isSessionActiveAndUnchanged) { + throw QueryExecutionErrors.unsafeCommandWhenSparkSessionStopped(command = + getClass.getSimpleName) + } + val doInsertion = if (mode == SaveMode.Append) { true } else {