Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public enum LogKeys implements LogKey {
END_INDEX,
END_POINT,
END_VERSION,
ENFORCE_EXACTLY_ONCE,
ENGINE,
EPOCH,
ERROR,
Expand Down
79 changes: 79 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5265,6 +5265,85 @@
],
"sqlState" : "42802"
},
"STATE_REPARTITION_INVALID_CHECKPOINT" : {
"message" : [
"The provided checkpoint location '<checkpointLocation>' is in an invalid state."
],
"subClass" : {
"LAST_BATCH_ABANDONED_REPARTITION" : {
"message" : [
"The last batch ID <lastBatchId> is a repartition batch with <lastBatchShufflePartitions> shuffle partitions and didn't finish successfully.",
"You're now requesting to repartition to <numPartitions> shuffle partitions.",
"Please retry with the same number of shuffle partitions as the previous attempt.",
"Once that completes successfully, you can repartition to another number of shuffle partitions."
]
},
"LAST_BATCH_FAILED" : {
"message" : [
"The last batch ID <lastBatchId> didn't finish successfully. Please make sure the streaming query finishes successfully, before repartitioning.",
"If using ProcessingTime trigger, you can use AvailableNow trigger instead, which will make sure the query terminates successfully by itself.",
"If you want to skip this check, set enforceExactlyOnceSink parameter in repartition to false.",
"But this can cause duplicate output records from the failed batch when using exactly-once sinks."
]
},
"MISSING_OFFSET_SEQ_METADATA" : {
"message" : [
"The OffsetSeq (v<version>) metadata is missing for batch ID <batchId>. Please make sure the checkpoint is from a supported Spark version."
]
},
"NO_BATCH_FOUND" : {
"message" : [
"No microbatch has been recorded in the checkpoint location. Make sure the streaming query has successfully completed at least one microbatch before repartitioning."
]
},
"NO_COMMITTED_BATCH" : {
"message" : [
"There is no committed microbatch. Make sure the streaming query has successfully completed at least one microbatch before repartitioning."
]
},
"OFFSET_SEQ_NOT_FOUND" : {
"message" : [
"Offset sequence entry for batch ID <batchId> not found. You might have set a very low value for",
"'spark.sql.streaming.minBatchesToRetain' config during the streaming query execution or you deleted files in the checkpoint location."
]
},
"SHUFFLE_PARTITIONS_ALREADY_MATCH" : {
"message" : [
"The number of shuffle partitions in the last committed batch (id=<batchId>) is the same as the requested <numPartitions> partitions.",
"Hence, already has the requested number of partitions, so no-op."
]
},
"UNSUPPORTED_OFFSET_SEQ_VERSION" : {
"message" : [
"Unsupported offset sequence version <version>. Please make sure the checkpoint is from a supported Spark version."
]
}
},
"sqlState" : "55019"
},
"STATE_REPARTITION_INVALID_PARAMETER" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: INVALID_OPTIONS ?

Copy link
Contributor Author

@micheal-o micheal-o Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to avoid confusion, since options in spark means .option(). So using parameter instead.

"message" : [
"The repartition parameter <parameter> is invalid:"
],
"subClass" : {
"IS_EMPTY" : {
"message" : [
"cannot be empty."
]
},
"IS_NOT_GREATER_THAN_ZERO" : {
"message" : [
"must be greater than zero."
]
},
"IS_NULL" : {
"message" : [
"cannot be null."
]
}
},
"sqlState" : "42616"
},
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
"message" : [
"Failed to create column family with unsupported starting character and name=<colFamilyName>."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

/**
* A class to manage operations on streaming query checkpoints.
*/
private[spark] abstract class StreamingCheckpointManager {

/**
* Repartition the stateful streaming operators state in the streaming checkpoint to have
* `numPartitions` partitions. The streaming query MUST not be running. If `numPartitions` is
* the same as the current number of partitions, this is a no-op, and an exception will be
* thrown.
*
* This produces a new microbatch in the checkpoint that contains the repartitioned state i.e.
* if the last streaming batch was batch `N`, this will create batch `N+1` with the
* repartitioned state. Note that this new batch doesn't read input data from sources, it only
* represents the repartition operation. The next time the streaming query is started, it will
* pick up from this new batch.
*
* This will return only when the repartitioning is complete or fails.
*
* @note
* This operation should only be performed after the streaming query has been stopped.
* @param checkpointLocation
* The checkpoint location of the streaming query, should be the `checkpointLocation` option
* on the DataStreamWriter.
* @param numPartitions
* the target number of state partitions.
* @param enforceExactlyOnceSink
* if we shouldn't allow skipping failed batches, to avoid duplicates in exactly once sinks.
*/
private[spark] def repartition(
checkpointLocation: String,
numPartitions: Int,
enforceExactlyOnceSink: Boolean = true): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ class SQLContext private[sql] (override val sparkSession: SparkSession)
*/
def streams: StreamingQueryManager = sparkSession.streams

/**
* Returns a `StreamingCheckpointManager` that allows managing any streaming checkpoint.
*/
private[spark] def streamingCheckpointManager: StreamingCheckpointManager =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add to Spark connect also ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark connect and pyspark will be added in subsequent PRs.

sparkSession.streamingCheckpointManager

/** @inheritdoc */
override def sparkContext: SparkContext = super.sparkContext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class SparkSession private(
@Unstable
def streams: StreamingQueryManager = sessionState.streamingQueryManager

private[spark] def streamingCheckpointManager = sessionState.streamingCheckpointManager

/**
* Returns an `ArtifactManager` that supports adding, managing and using session-scoped artifacts
* (jars, classfiles, etc).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.classic

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionErrors, OfflineStateRepartitionRunner}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming

/** @inheritdoc */
private[spark] class StreamingCheckpointManager(
sparkSession: SparkSession,
sqlConf: SQLConf) extends streaming.StreamingCheckpointManager with Logging {

/** @inheritdoc */
override private[spark] def repartition(
checkpointLocation: String,
numPartitions: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the underlying recorded value is Int, but should we consider bumping this to Long eventually - probably unlikely for users to have those many partitions though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep it as int since that is what we record in checkpoint. Also using Long is very unrealistic.

enforceExactlyOnceSink: Boolean = true): Unit = {
checkpointLocation match {
case null =>
throw OfflineStateRepartitionErrors.parameterIsNullError("checkpointLocation")
case "" =>
throw OfflineStateRepartitionErrors.parameterIsEmptyError("checkpointLocation")
case _ => // Valid case, no action needed
}

if (numPartitions <= 0) {
throw OfflineStateRepartitionErrors.parameterIsNotGreaterThanZeroError("numPartitions")
}

val runner = new OfflineStateRepartitionRunner(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we encapsulate this whole block in a try-catch in case we want to catch and log any warnings ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see run method

sparkSession,
checkpointLocation,
numPartitions,
enforceExactlyOnceSink
)
runner.run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add some logging to indicate the repartition started/ended and the time it took to complete the operation along with other identifying information about the query ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the runner run method

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.transformwith
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE
import org.apache.spark.sql.execution.streaming.runtime.StreamingQueryCheckpointMetadata
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.streaming.TimeMode
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -481,7 +482,8 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
}

val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation)
val resolvedCpLocation = StreamingUtils.resolvedCheckpointLocation(
hadoopConf, checkpointLocation)

var batchId = Option(options.get(BATCH_ID)).map(_.toLong)

Expand Down Expand Up @@ -617,14 +619,6 @@ object StateSourceOptions extends DataSourceOptions {
startOperatorStateUniqueIds, endOperatorStateUniqueIds)
}

private def resolvedCheckpointLocation(
hadoopConf: Configuration,
checkpointLocation: String): String = {
val checkpointPath = new Path(checkpointLocation)
val fs = checkpointPath.getFileSystem(hadoopConf)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}

private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = {
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
commitLog.getLatest() match {
Expand Down
Loading