diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index e469c9989f2c..a4a68ef1b09e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -33,6 +33,7 @@ import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOut import org.roaringbitmap.RoaringBitmap import org.apache.spark.broadcast.{Broadcast, BroadcastManager} +import org.apache.spark.celeborn.CelebornShuffleState import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec @@ -839,6 +840,7 @@ private[spark] class MapOutputTrackerMaster( shuffleStatus.invalidateSerializedMergeOutputStatusCache() } } + CelebornShuffleState.unregisterCelebornSkewedShuffle(shuffleId) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 0388c7b576b0..59fdc81b09dc 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.celeborn.CelebornShuffleState import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} @@ -414,6 +415,7 @@ object SparkEnv extends Logging { if (isDriver) { val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath envInstance.driverTmpDir = Some(sparkFilesDir) + CelebornShuffleState.init(envInstance) } envInstance diff --git a/core/src/main/scala/org/apache/spark/celeborn/CelebornShuffleState.scala b/core/src/main/scala/org/apache/spark/celeborn/CelebornShuffleState.scala new file mode 100644 index 000000000000..5e190c512df2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/celeborn/CelebornShuffleState.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.celeborn + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config.ConfigBuilder + +object CelebornShuffleState { + + private val CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ = + ConfigBuilder("spark.celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled") + .booleanConf + .createWithDefault(false) + + private val CELEBORN_STAGE_RERUN_ENABLED = + ConfigBuilder("spark.celeborn.client.spark.stageRerun.enabled") + .withAlternative("spark.celeborn.client.spark.fetch.throwsFetchFailure") + .booleanConf + .createWithDefault(false) + + private val celebornOptimizeSkewedPartitionReadEnabled = new AtomicBoolean() + private val stageRerunEnabled = new AtomicBoolean() + private val skewShuffleIds = ConcurrentHashMap.newKeySet[Int]() + + // call this from SparkEnv.create + def init(env: SparkEnv): Unit = { + // cleanup existing state (if required) - and initialize + skewShuffleIds.clear() + + // use env.conf for all initialization, and not SQLConf + celebornOptimizeSkewedPartitionReadEnabled.set( + env.conf.get("spark.shuffle.manager", "sort").contains("celeborn") && + env.conf.get(CELEBORN_CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ)) + stageRerunEnabled.set(env.conf.get(CELEBORN_STAGE_RERUN_ENABLED)) + } + + def unregisterCelebornSkewedShuffle(shuffleId: Int): Unit = { + skewShuffleIds.remove(shuffleId) + } + + def registerCelebornSkewedShuffle(shuffleId: Int): Unit = { + skewShuffleIds.add(shuffleId) + } + + def isCelebornSkewedShuffle(shuffleId: Int): Boolean = { + skewShuffleIds.contains(shuffleId) + } + + def celebornAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean = { + celebornOptimizeSkewedPartitionReadEnabled.get() + } + + def celebornStageRerunEnabled: Boolean = { + stageRerunEnabled.get() + } + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b950c07f3d83..2cb430c3c3d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.{Futures, SettableFuture} import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.celeborn.CelebornShuffleState import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config @@ -1780,7 +1781,7 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + disallowStageRetryForTest || CelebornShuffleState.isCelebornSkewedShuffle(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala new file mode 100644 index 000000000000..3dc606784617 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CelebornShuffleUtil.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import java.util.Locale + +import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} + +object CelebornShuffleUtil { + + def isCelebornShuffle(shuffleExchangeLike: ShuffleExchangeLike): Boolean = { + shuffleExchangeLike match { + case exec: ShuffleExchangeExec => + exec.shuffleDependency.shuffleHandle + .getClass.getName.toLowerCase(Locale.ROOT).contains("celeborn") + case _ => false + } + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala index 1752907a9a54..2c6a49b78ebb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala @@ -50,12 +50,13 @@ object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule { private def optimizeSkewedPartitions( shuffleId: Int, bytesByPartitionId: Array[Long], - targetSize: Long): Seq[ShufflePartitionSpec] = { + targetSize: Long, + isCelebornShuffle: Boolean = false): Seq[ShufflePartitionSpec] = { bytesByPartitionId.indices.flatMap { reduceIndex => val bytes = bytesByPartitionId(reduceIndex) if (bytes > targetSize) { val newPartitionSpec = - ShufflePartitionsUtil.createSkewPartitionSpecs(shuffleId, reduceIndex, targetSize) + ShufflePartitionsUtil.createSkewPartitionSpecs(shuffleId, reduceIndex, targetSize, isCelebornShuffle) if (newPartitionSpec.isEmpty) { CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil } else { @@ -77,8 +78,9 @@ object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule { return shuffle } + val isCelebornShuffle = CelebornShuffleUtil.isCelebornShuffle(shuffle.shuffle) val newPartitionsSpec = optimizeSkewedPartitions( - mapStats.get.shuffleId, mapStats.get.bytesByPartitionId, advisorySize) + mapStats.get.shuffleId, mapStats.get.bytesByPartitionId, advisorySize, isCelebornShuffle) // return origin plan if we can not optimize partitions if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) { shuffle diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 88abe68197be..150699a84a38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -157,8 +157,10 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1, rightSize)) val leftParts = if (isLeftSkew) { + val isCelebornShuffle = CelebornShuffleUtil.isCelebornShuffle(left.shuffle) val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs( - left.mapStats.get.shuffleId, partitionIndex, leftTargetSize) + left.mapStats.get.shuffleId, partitionIndex, leftTargetSize, + isCelebornShuffle = isCelebornShuffle) if (skewSpecs.isDefined) { logDebug(s"Left side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(leftSize)}) is skewed, " + @@ -171,8 +173,10 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { } val rightParts = if (isRightSkew) { + val isCelebornShuffle = CelebornShuffleUtil.isCelebornShuffle(right.shuffle) val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs( - right.mapStats.get.shuffleId, partitionIndex, rightTargetSize) + right.mapStats.get.shuffleId, partitionIndex, rightTargetSize, + isCelebornShuffle = isCelebornShuffle) if (skewSpecs.isDefined) { logDebug(s"Right side partition $partitionIndex " + s"(${FileUtils.byteCountToDisplaySize(rightSize)}) is skewed, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 3609548f3748..d34f43bf0647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable.ArrayBuffer import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} +import org.apache.spark.celeborn.CelebornShuffleState import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec} @@ -376,11 +377,20 @@ object ShufflePartitionsUtil extends Logging { def createSkewPartitionSpecs( shuffleId: Int, reducerId: Int, - targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = { + targetSize: Long, + isCelebornShuffle: Boolean = false): Option[Seq[PartialReducerPartitionSpec]] = { val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) if (mapPartitionSizes.exists(_ < 0)) return None val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, targetSize) if (mapStartIndices.length > 1) { + val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = + CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + + val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled + if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId) + } Some(mapStartIndices.indices.map { i => val startMapIndex = mapStartIndices(i) val endMapIndex = if (i == mapStartIndices.length - 1) { @@ -388,8 +398,21 @@ object ShufflePartitionsUtil extends Logging { } else { mapStartIndices(i + 1) } - val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) + var dataSize = 0L + var mapIndex = startMapIndex + while (mapIndex < endMapIndex) { + dataSize += mapPartitionSizes(mapIndex) + mapIndex += 1 + } + + if (celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + // These `dataSize` variables may not be accurate as they only represent the sum of + // `dataSize` when the Celeborn optimize skewed partition read feature is enabled. + // Please not to use these dataSize variables in any other part of the codebase. + PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, dataSize) + } else { + PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) + } }) } else { None