Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50648][CORE] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie running tasks #49270

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,9 @@ private[spark] class DAGScheduler(
} else {
// This stage is only used by the job, so finish the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
val isRunningStage = runningStages.contains(stage) ||
(waitingStages.contains(stage) && taskScheduler.hasRunningTasks(stageId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just kill all waiting stages? Does taskScheduler.killAllTaskAttempts handle it well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and can we add a special flag to indicate the waiting stages that are submitted due to retry?

Copy link
Contributor Author

@yabola yabola Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just kill all waiting stages? Does taskScheduler.killAllTaskAttempts handle it well?

I tested: if the normally generated waiting stages call killAllTaskAttempts, the stage status will be displayed as FAILED, which was SKIPPED before, killAllTaskAttempts itself will not go wrong.
Always kill waiting stage seems to be a safer approach (tasks really shouldn't be run anymore), but it may generate unnecessary stageFailed events compared with before.

and can we add a special flag to indicate the waiting stages that are submitted due to retry?

Yes, we can add a flag , please see the update codes.
Actually , there is a trade-off here to kill waiting stages, the range of choices from large to small :

  • kill all waiting stage
  • kill waiting stage when had failed ( stage#failedAttemptIds > 0)
  • kill waiting stage when retry in fetch failed (stage#resubmitInFetchFailed)
  • kill waiting stage which only has running tasks (this might not be enough? )

if (isRunningStage) {
try { // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask
taskScheduler.killAllTaskAttempts(stageId, shouldInterruptTaskThread(job), reason)
if (legacyAbortStageAfterKillTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]


def hasRunningTasks(stageId: Int): Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method ?

Copy link
Contributor Author

@yabola yabola Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this
This is the code I wrote at the beginning (kill waiting stage which only has running tasks).
I'm not sure which way is better and I will delete this if we choose one.

}
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,16 @@ private[spark] class TaskSchedulerImpl(

override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()

override def hasRunningTasks(stageId: Int): Boolean = synchronized {
var hasRunningTasks = false
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (_, tsm) =>
hasRunningTasks = hasRunningTasks || tsm.runningTasksSet.nonEmpty
}
}
hasRunningTasks
yabola marked this conversation as resolved.
Show resolved Hide resolved
}

// exposed for testing
private[scheduler] def taskSetManagerForAttempt(
stageId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def hasRunningTasks(stageId: Int): Boolean = false
override def executorDecommission(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Unit = {
Expand Down Expand Up @@ -941,6 +942,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def hasRunningTasks(stageId: Int): Boolean = false
override def executorDecommission(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def hasRunningTasks(stageId: Int): Boolean = false
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
Expand Down
Loading