diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-11-14 13:33:11 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-11-14 13:55:03 -0800 |
commit | b4546ba9e694529c359b7ca5c26829ead2c07f1a (patch) | |
tree | 64478157137a4ed39c4954fbfb05c7ca2cd4cd3b | |
parent | 1a4cfbea334c7b0dae287eab4c3131c8f4b8a992 (diff) | |
download | spark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.tar.gz spark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.tar.bz2 spark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.zip |
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala | 13 |
1 files changed, 3 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 53a589615d..c1e65a3c48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None - var taskFailed = false synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToExecutorId.remove(tid) } - if (state == TaskState.FAILED) { - taskFailed = true - } activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } - if (taskFailed) { - // Also revive offers if a task had failed for some reason other than host lost - backend.reviveOffers() - } } def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { @@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskState: TaskState, reason: Option[TaskEndReason]) = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (taskState == TaskState.FINISHED) { - // The task finished successfully but the result was lost, so we should revive offers. + if (taskState != TaskState.KILLED) { + // Need to revive offers again now that the task set manager state has been updated to + // reflect failed tasks that need to be re-run. backend.reviveOffers() } } |