aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-11-14 22:29:28 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-11-14 22:29:28 -0800
commit96e0fb46309698b685c811a65bd8e1a691389994 (patch)
treefc4d83b2a010f78bc989321835b2c49adf827ab6 /core
parentdfd40e9f6f87ff1f205944997cdbbb6bb7f0312c (diff)
parentb4546ba9e694529c359b7ca5c26829ead2c07f1a (diff)
downloadspark-96e0fb46309698b685c811a65bd8e1a691389994.tar.gz
spark-96e0fb46309698b685c811a65bd8e1a691389994.tar.bz2
spark-96e0fb46309698b685c811a65bd8e1a691389994.zip
Merge pull request #173 from kayousterhout/scheduler_hang
Fix bug where scheduler could hang after task failure. When a task fails, we need to call reviveOffers() so that the task can be rescheduled on a different machine. In the current code, the state in ClusterTaskSetManager indicating which tasks are pending may be updated after revive offers is called (there's a race condition here), so when revive offers is called, the task set manager does not yet realize that there are failed tasks that need to be relaunched. This isn't currently unit tested but will be once my pull request for merging the cluster and local schedulers goes in -- at which point many more of the unit tests will exercise the code paths through the cluster scheduler (currently the failure test suite uses the local scheduler, which is why we didn't see this bug before).
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala13
1 files changed, 3 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 53a589615d..c1e65a3c48 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
- var taskFailed = false
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
taskIdToExecutorId.remove(tid)
}
- if (state == TaskState.FAILED) {
- taskFailed = true
- }
activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
- if (taskFailed) {
- // Also revive offers if a task had failed for some reason other than host lost
- backend.reviveOffers()
- }
}
def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
- if (taskState == TaskState.FINISHED) {
- // The task finished successfully but the result was lost, so we should revive offers.
+ if (taskState != TaskState.KILLED) {
+ // Need to revive offers again now that the task set manager state has been updated to
+ // reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}