From 893fe64007960eb4674004d715336c955fe890fc Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Mon, 23 Dec 2024 19:29:12 +0800 Subject: [PATCH 01/12] [SPARK-50648] [core] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie tasks --- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../org/apache/spark/scheduler/TaskScheduler.scala | 2 ++ .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 10 ++++++++++ .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 ++ .../spark/scheduler/ExternalClusterManagerSuite.scala | 1 + 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4f7338f74e298..e2180fb5ef0b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2937,7 +2937,9 @@ private[spark] class DAGScheduler( } else { // This stage is only used by the job, so finish the stage if it is running. val stage = stageIdToStage(stageId) - if (runningStages.contains(stage)) { + val isRunningStage = runningStages.contains(stage) || + (waitingStages.contains(stage) && taskScheduler.hasRunningTasks(stageId)) + if (isRunningStage) { try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask taskScheduler.killAllTaskAttempts(stageId, shouldInterruptTaskThread(job), reason) if (legacyAbortStageAfterKillTasks) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1e6de9ef46f34..93dddcb5a22d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -121,4 +121,6 @@ private[spark] trait TaskScheduler { */ def applicationAttemptId(): Option[String] + + def hasRunningTasks(stageId: Int): Boolean } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8e3cb1379339d..7407438ef6e78 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1212,6 +1212,16 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() + override def hasRunningTasks(stageId: Int): Boolean = synchronized { + var hasRunningTasks = false + taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => + attempts.foreach { case (_, tsm) => + hasRunningTasks = hasRunningTasks || tsm.runningTasksSet.nonEmpty + } + } + hasRunningTasks + } + // exposed for testing private[scheduler] def taskSetManagerForAttempt( stageId: Int, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 243d33fe55a79..f47b0fb9cb117 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -225,6 +225,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def hasRunningTasks(stageId: Int): Boolean = false override def executorDecommission( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { @@ -941,6 +942,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def hasRunningTasks(stageId: Int): Boolean = false override def executorDecommission( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 30973cc963fbd..398fb04bb9b8f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -95,6 +95,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def hasRunningTasks(stageId: Int): Boolean = false def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], From a820cad141c91d1ad88c1a00553d0ae7dbd58e28 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 24 Dec 2024 20:09:09 +0800 Subject: [PATCH 02/12] add a flag in Stage to indicate retry when shuffle failed --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++---- .../main/scala/org/apache/spark/scheduler/Stage.scala | 7 +++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2180fb5ef0b6..cb7e65e27b675 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2187,7 +2187,8 @@ private[spark] class DAGScheduler( log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } - + failedStage.markResubmitInFetchFailed() + mapStage.markResubmitInFetchFailed() // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit @@ -2937,9 +2938,9 @@ private[spark] class DAGScheduler( } else { // This stage is only used by the job, so finish the stage if it is running. val stage = stageIdToStage(stageId) - val isRunningStage = runningStages.contains(stage) || - (waitingStages.contains(stage) && taskScheduler.hasRunningTasks(stageId)) - if (isRunningStage) { + val shouldKill = runningStages.contains(stage) || + (waitingStages.contains(stage) && stage.resubmitInFetchFailed) + if (shouldKill) { try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask taskScheduler.killAllTaskAttempts(stageId, shouldInterruptTaskThread(job), reason) if (legacyAbortStageAfterKillTasks) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index f35beafd87480..7632e970122cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -71,6 +71,7 @@ private[scheduler] abstract class Stage( /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 private[scheduler] def getNextAttemptId: Int = nextAttemptId + private[scheduler] var _resubmitInFetchFailed: Boolean = false val name: String = callSite.shortForm val details: String = callSite.longForm @@ -96,6 +97,12 @@ private[scheduler] abstract class Stage( failedAttemptIds.clear() } + private[scheduler] def resubmitInFetchFailed: Boolean = _resubmitInFetchFailed + + private[scheduler] def markResubmitInFetchFailed() : Unit = { + _resubmitInFetchFailed = true + } + /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, From 1bcd4b681f22a188b4faf56c31e76ee0340f2f07 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Wed, 25 Dec 2024 00:28:10 +0800 Subject: [PATCH 03/12] fix ut && add more detailed log --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f47b0fb9cb117..6be6f40eea229 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2250,7 +2250,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // original result task 1.0 succeed runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42)) sc.listenerBus.waitUntilEmpty() - assert(completedStage === List(0, 1, 1, 0)) + assert(completedStage === List(0, 1, 1, 0, 1)) assert(scheduler.activeJobs.isEmpty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d298b98aaa8da..d41ab05c51a3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -938,8 +938,12 @@ class AdaptiveQueryExecSuite val error = intercept[SparkException] { joined.collect() } - assert((Seq(error) ++ Option(error.getCause) ++ error.getSuppressed()).exists( - e => e.getMessage() != null && e.getMessage().contains("coalesce test error"))) + val errorMessages = (Seq(error) ++ Option(error.getCause) ++ error.getSuppressed()) + .filter(e => e.getMessage != null).map(e => e.getMessage) + assert(errorMessages.exists( + e => e.contains("coalesce test error")), + s"Error messages should contain `coalesce test error`, " + + s"error messages: $errorMessages") val adaptivePlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] From 7fb4bc139d3ee6f36f4c4481e12aeed633ac9a92 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 00:21:15 +0800 Subject: [PATCH 04/12] update comments && add ut --- .../apache/spark/scheduler/DAGScheduler.scala | 5 +- .../org/apache/spark/scheduler/Stage.scala | 10 ++-- .../spark/scheduler/TaskSchedulerImpl.scala | 8 +-- .../spark/scheduler/DAGSchedulerSuite.scala | 59 ++++++++++++++++++- 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb7e65e27b675..fb7460189c6f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1566,6 +1566,7 @@ private[spark] class DAGScheduler( addPySparkConfigsToProperties(stage, properties) runningStages += stage + stage.setResubmitByFetchFailure(false) // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted @@ -2187,8 +2188,8 @@ private[spark] class DAGScheduler( log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } - failedStage.markResubmitInFetchFailed() - mapStage.markResubmitInFetchFailed() + failedStage.setResubmitByFetchFailure(true) + mapStage.setResubmitByFetchFailure(true) // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 7632e970122cb..03da11f0a8641 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -71,7 +71,9 @@ private[scheduler] abstract class Stage( /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 private[scheduler] def getNextAttemptId: Int = nextAttemptId - private[scheduler] var _resubmitInFetchFailed: Boolean = false + + /** A flag indicates that the stage has been resubmitted and is waiting to be executed. */ + private[scheduler] var _resubmitByFetchFailure: Boolean = false val name: String = callSite.shortForm val details: String = callSite.longForm @@ -97,10 +99,10 @@ private[scheduler] abstract class Stage( failedAttemptIds.clear() } - private[scheduler] def resubmitInFetchFailed: Boolean = _resubmitInFetchFailed + private[scheduler] def resubmitInFetchFailed: Boolean = _resubmitByFetchFailure - private[scheduler] def markResubmitInFetchFailed() : Unit = { - _resubmitInFetchFailed = true + private[scheduler] def setResubmitByFetchFailure(resubmitByFetchFailure: Boolean) : Unit = { + _resubmitByFetchFailure = resubmitByFetchFailure } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7407438ef6e78..ae8552042dbbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1213,13 +1213,9 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() override def hasRunningTasks(stageId: Int): Boolean = synchronized { - var hasRunningTasks = false - taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => - attempts.foreach { case (_, tsm) => - hasRunningTasks = hasRunningTasks || tsm.runningTasksSet.nonEmpty - } + taskSetsByStageIdAndAttempt.get(stageId).exists { attempts => + attempts.values.exists(_.runningTasksSet.nonEmpty) } - hasRunningTasks } // exposed for testing diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6be6f40eea229..5702c753c27c5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -34,8 +34,8 @@ import org.roaringbitmap.RoaringBitmap import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedException import org.scalatest.time.SpanSugar._ - import org.apache.spark._ + import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config @@ -185,6 +185,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti private var firstInit: Boolean = _ /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() + /** Track running tasks, key is the task's stageId of the task, value is the task's partitionId */ + var runningTaskInfos = new HashMap[Int, HashSet[Int]]() /** Stages for which the DAGScheduler has called TaskScheduler.killAllTaskAttempts(). */ val cancelledStages = new HashSet[Int]() @@ -206,12 +208,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet + val taskPartitionIds = new HashSet[Int]() + taskSet.tasks.foreach(task => taskPartitionIds += task.partitionId) + runningTaskInfos.put(taskSet.stageId, taskPartitionIds) } override def killTaskAttempt( taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def killAllTaskAttempts( stageId: Int, interruptThread: Boolean, reason: String): Unit = { cancelledStages += stageId + runningTaskInfos.remove(stageId) } override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = { taskSets.filter(_.stageId == stageId).lastOption.foreach { ts => @@ -394,6 +400,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti handleShuffleMergeFinalized(shuffleMapStage, shuffleMapStage.shuffleDep.shuffleMergeId) } } + + override private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = { + super.handleTaskCompletion(event) + if (runningTaskInfos.contains(event.task.stageId)) { + runningTaskInfos(event.task.stageId) -= event.task.partitionId + if (runningTaskInfos(event.task.stageId).isEmpty) { + runningTaskInfos.remove(event.task.stageId) + } + } + } } override def beforeEach(): Unit = { @@ -2227,7 +2243,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42)) // speculative result task 1.1 fetch failed val info = new TaskInfo( - 4, index = 1, attemptNumber = 1, partitionId = 1, 0L, "", "", TaskLocality.ANY, true) + 0, index = 1, attemptNumber = 1, partitionId = 1, 0L, "", "", TaskLocality.ANY, true) runEvent(makeCompletionEvent( taskSets(1).tasks(1), FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, "ignored"), @@ -2254,6 +2270,45 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(scheduler.activeJobs.isEmpty) } + test("SPARK-50648: when job is cancelled during shuffle retry in parent stage, " + + "should kill all running tasks") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + completeShuffleMapStageSuccessfully(0, 0, 2) + sc.listenerBus.waitUntilEmpty() + + val info = new TaskInfo( + 3, index = 1, attemptNumber = 1, + partitionId = taskSets(1).tasks(0).partitionId, 0L, "", "", TaskLocality.ANY, true) + // result task 0.0 fetch failed, but result task 1.0 is still running + runEvent(makeCompletionEvent(taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, "ignored"), + null, + Seq.empty, + Array.empty, + info)) + sc.listenerBus.waitUntilEmpty() + + Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + // map stage is running by resubmitted, result stage is waiting + // (the origin result task 1.0 is still running) + assert(scheduler.runningStages.size == 1, "Map stage should be running") + val mapStage = scheduler.runningStages.head + assert(mapStage.id === 0) + assert(mapStage.latestInfo.failureReason.isEmpty) + assert(scheduler.waitingStages.size == 1, "Result stage should be waiting") + assert(runningTaskInfos(taskSets(1).stageId).size == 1, + "origin result task 1.0 should be running") + + // the origin result task 1.0 should be killed + scheduler.cancelAllJobs() + assert(runningTaskInfos.isEmpty) + assert(scheduler.runningStages.isEmpty) + assert(scheduler.waitingStages.isEmpty) + } + test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { val acc = new LongAccumulator { override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException From 9a58f6052d57cbdf986b61a64a88095f9491a68d Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 00:26:03 +0800 Subject: [PATCH 05/12] fix ut --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5702c753c27c5..d947a793ebfc1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -34,8 +34,8 @@ import org.roaringbitmap.RoaringBitmap import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedException import org.scalatest.time.SpanSugar._ -import org.apache.spark._ +import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config @@ -2243,7 +2243,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42)) // speculative result task 1.1 fetch failed val info = new TaskInfo( - 0, index = 1, attemptNumber = 1, partitionId = 1, 0L, "", "", TaskLocality.ANY, true) + 4, index = 1, attemptNumber = 1, partitionId = 1, 0L, "", "", TaskLocality.ANY, true) runEvent(makeCompletionEvent( taskSets(1).tasks(1), FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, "ignored"), From bc33ffbb728adb9d205a8e7618e225aae3199b81 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 00:33:54 +0800 Subject: [PATCH 06/12] fix ut --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d947a793ebfc1..9c2d2103f530a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -185,7 +185,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti private var firstInit: Boolean = _ /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() - /** Track running tasks, key is the task's stageId of the task, value is the task's partitionId */ + /** Track running tasks, the key is the task's stageId , the value is the task's partitionId */ var runningTaskInfos = new HashMap[Int, HashSet[Int]]() /** Stages for which the DAGScheduler has called TaskScheduler.killAllTaskAttempts(). */ @@ -231,7 +231,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def hasRunningTasks(stageId: Int): Boolean = false + override def hasRunningTasks(stageId: Int): Boolean = { + runningTaskInfos.contains(stageId) && runningTaskInfos(stageId).nonEmpty + } override def executorDecommission( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { From 4e0fd2f72453b41d04ccd5b94ec8c5be27265411 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 19:10:32 +0800 Subject: [PATCH 07/12] kill stage which has failedAttemptIds --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +----- .../main/scala/org/apache/spark/scheduler/Stage.scala | 9 --------- .../scala/org/apache/spark/scheduler/TaskScheduler.scala | 2 -- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 ------ .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++----- .../spark/scheduler/ExternalClusterManagerSuite.scala | 1 - 6 files changed, 4 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fb7460189c6f0..9f27cfce634d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1566,7 +1566,6 @@ private[spark] class DAGScheduler( addPySparkConfigsToProperties(stage, properties) runningStages += stage - stage.setResubmitByFetchFailure(false) // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted @@ -2188,8 +2187,6 @@ private[spark] class DAGScheduler( log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } - failedStage.setResubmitByFetchFailure(true) - mapStage.setResubmitByFetchFailure(true) // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit @@ -2939,8 +2936,7 @@ private[spark] class DAGScheduler( } else { // This stage is only used by the job, so finish the stage if it is running. val stage = stageIdToStage(stageId) - val shouldKill = runningStages.contains(stage) || - (waitingStages.contains(stage) && stage.resubmitInFetchFailed) + val shouldKill = runningStages.contains(stage) || stage.failedAttemptIds.nonEmpty if (shouldKill) { try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask taskScheduler.killAllTaskAttempts(stageId, shouldInterruptTaskThread(job), reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 03da11f0a8641..f35beafd87480 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -72,9 +72,6 @@ private[scheduler] abstract class Stage( private var nextAttemptId: Int = 0 private[scheduler] def getNextAttemptId: Int = nextAttemptId - /** A flag indicates that the stage has been resubmitted and is waiting to be executed. */ - private[scheduler] var _resubmitByFetchFailure: Boolean = false - val name: String = callSite.shortForm val details: String = callSite.longForm @@ -99,12 +96,6 @@ private[scheduler] abstract class Stage( failedAttemptIds.clear() } - private[scheduler] def resubmitInFetchFailed: Boolean = _resubmitByFetchFailure - - private[scheduler] def setResubmitByFetchFailure(resubmitByFetchFailure: Boolean) : Unit = { - _resubmitByFetchFailure = resubmitByFetchFailure - } - /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 93dddcb5a22d3..1e6de9ef46f34 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -121,6 +121,4 @@ private[spark] trait TaskScheduler { */ def applicationAttemptId(): Option[String] - - def hasRunningTasks(stageId: Int): Boolean } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ae8552042dbbd..8e3cb1379339d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1212,12 +1212,6 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() - override def hasRunningTasks(stageId: Int): Boolean = synchronized { - taskSetsByStageIdAndAttempt.get(stageId).exists { attempts => - attempts.values.exists(_.runningTasksSet.nonEmpty) - } - } - // exposed for testing private[scheduler] def taskSetManagerForAttempt( stageId: Int, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9c2d2103f530a..1e4a5c56c94c9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -231,9 +231,6 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def hasRunningTasks(stageId: Int): Boolean = { - runningTaskInfos.contains(stageId) && runningTaskInfos(stageId).nonEmpty - } override def executorDecommission( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { @@ -960,7 +957,6 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def hasRunningTasks(stageId: Int): Boolean = false override def executorDecommission( executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {} @@ -2295,12 +2291,14 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) // map stage is running by resubmitted, result stage is waiting - // (the origin result task 1.0 is still running) + // (the origin result task 1.0 in result stage is still running) assert(scheduler.runningStages.size == 1, "Map stage should be running") val mapStage = scheduler.runningStages.head assert(mapStage.id === 0) assert(mapStage.latestInfo.failureReason.isEmpty) assert(scheduler.waitingStages.size == 1, "Result stage should be waiting") + // tasks in map stage and origin result task 1.0 in result stage should be running + assert(runningTaskInfos.size == 2) assert(runningTaskInfos(taskSets(1).stageId).size == 1, "origin result task 1.0 should be running") diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 398fb04bb9b8f..30973cc963fbd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -95,7 +95,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None - override def hasRunningTasks(stageId: Int): Boolean = false def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], From c589fe8438b76dc7fe7b3937625b9a3160c32246 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 19:11:29 +0800 Subject: [PATCH 08/12] kill stage which has failedAttemptIds --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9f27cfce634d4..fa007ea29d306 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2187,6 +2187,7 @@ private[spark] class DAGScheduler( log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } + // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit From 8ebd0b1460367afd79d0b3bc164d1df24f3aa576 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 19:13:05 +0800 Subject: [PATCH 09/12] kill stage which has failedAttemptIds --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fa007ea29d306..b91e1f1e9a340 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2187,7 +2187,7 @@ private[spark] class DAGScheduler( log"we will roll back and rerun below stages which include itself and all its " + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } - + // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit From 85c7c0223c6c58143fabf057cbdc44d6fe4eadb7 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 19:16:29 +0800 Subject: [PATCH 10/12] kill stage which has failedAttemptIds --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b91e1f1e9a340..e06b7d86e1db0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2937,8 +2937,8 @@ private[spark] class DAGScheduler( } else { // This stage is only used by the job, so finish the stage if it is running. val stage = stageIdToStage(stageId) - val shouldKill = runningStages.contains(stage) || stage.failedAttemptIds.nonEmpty - if (shouldKill) { + // Stages with failedAttemptIds may have tasks that are running + if (runningStages.contains(stage) || stage.failedAttemptIds.nonEmpty) { try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask taskScheduler.killAllTaskAttempts(stageId, shouldInterruptTaskThread(job), reason) if (legacyAbortStageAfterKillTasks) { From d2822ede7bb9959e7e184b73a6059232190cc34b Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 19:29:10 +0800 Subject: [PATCH 11/12] update ut --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1e4a5c56c94c9..0260f05761e57 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2291,19 +2291,18 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) // map stage is running by resubmitted, result stage is waiting - // (the origin result task 1.0 in result stage is still running) + // map tasks and the origin result task 1.0 are running assert(scheduler.runningStages.size == 1, "Map stage should be running") val mapStage = scheduler.runningStages.head assert(mapStage.id === 0) assert(mapStage.latestInfo.failureReason.isEmpty) assert(scheduler.waitingStages.size == 1, "Result stage should be waiting") - // tasks in map stage and origin result task 1.0 in result stage should be running assert(runningTaskInfos.size == 2) assert(runningTaskInfos(taskSets(1).stageId).size == 1, "origin result task 1.0 should be running") - // the origin result task 1.0 should be killed scheduler.cancelAllJobs() + // all tasks should be killed assert(runningTaskInfos.isEmpty) assert(scheduler.runningStages.isEmpty) assert(scheduler.waitingStages.isEmpty) From a37c667a7415627ac0082696ffe77b8cf361b812 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 26 Dec 2024 22:45:55 +0800 Subject: [PATCH 12/12] update ut --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d41ab05c51a3e..d298b98aaa8da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -938,12 +938,8 @@ class AdaptiveQueryExecSuite val error = intercept[SparkException] { joined.collect() } - val errorMessages = (Seq(error) ++ Option(error.getCause) ++ error.getSuppressed()) - .filter(e => e.getMessage != null).map(e => e.getMessage) - assert(errorMessages.exists( - e => e.contains("coalesce test error")), - s"Error messages should contain `coalesce test error`, " + - s"error messages: $errorMessages") + assert((Seq(error) ++ Option(error.getCause) ++ error.getSuppressed()).exists( + e => e.getMessage() != null && e.getMessage().contains("coalesce test error"))) val adaptivePlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]