aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorliujianhui <liujianhui@didichuxing>2017-03-28 12:13:45 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2017-03-28 12:13:45 -0700
commit92e385e0b55d70a48411e90aa0f2ed141c4d07c8 (patch)
tree24c3e11cd191b210684d2b8ce9a2970d3311852b /core/src/main/scala
parentd4fac410e0554b7ccd44be44b7ce2fe07ed7f206 (diff)
downloadspark-92e385e0b55d70a48411e90aa0f2ed141c4d07c8.tar.gz
spark-92e385e0b55d70a48411e90aa0f2ed141c4d07c8.tar.bz2
spark-92e385e0b55d70a48411e90aa0f2ed141c4d07c8.zip
[SPARK-19868] conflict TasksetManager lead to spark stopped
## What changes were proposed in this pull request? We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets. Author: liujianhui <liujianhui@didichuxing> Closes #17208 from liujianhuiouc/spark-19868.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a177aab5f9..a41b059fa7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)
- // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
- // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
- // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
- // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
- // Note: "result.value()" only deserializes the value when it's called at the first time, so
- // here "result.value()" just returns the value and won't block other threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
+
// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
+ // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
+ // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
+ // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
+ // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
+ // Note: "result.value()" only deserializes the value when it's called at the first time, so
+ // here "result.value()" just returns the value and won't block other threads.
+ sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()
}