aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-14 13:33:11 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-14 13:33:11 -0800
commit2b807e4f2f853a9b1e8cba5147d182e7b05022bc (patch)
tree3a6b46974dca8bfc2919f25dee42ecead207f6a6 /core/src/main/scala/org/apache
parentc64690d7252248df97bbe4b2bef8f540b977842d (diff)
downloadspark-2b807e4f2f853a9b1e8cba5147d182e7b05022bc.tar.gz
spark-2b807e4f2f853a9b1e8cba5147d182e7b05022bc.tar.bz2
spark-2b807e4f2f853a9b1e8cba5147d182e7b05022bc.zip
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the task can be rescheduled on a different machine. In the current code, the state in ClusterTaskSetManager indicating which tasks are pending may be updated after revive offers is called (there's a race condition here), so when revive offers is called, the task set manager does not yet realize that there are failed tasks that need to be relaunched.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala13
1 files changed, 3 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
index 37d554715d..2e4ba53d9b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
@@ -250,7 +250,6 @@ private[spark] class ClusterScheduler(
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
- var taskFailed = false
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -270,9 +269,6 @@ private[spark] class ClusterScheduler(
}
taskIdToExecutorId.remove(tid)
}
- if (state == TaskState.FAILED) {
- taskFailed = true
- }
activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
@@ -294,10 +290,6 @@ private[spark] class ClusterScheduler(
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
- if (taskFailed) {
- // Also revive offers if a task had failed for some reason other than host lost
- backend.reviveOffers()
- }
}
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
@@ -317,8 +309,9 @@ private[spark] class ClusterScheduler(
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
- if (taskState == TaskState.FINISHED) {
- // The task finished successfully but the result was lost, so we should revive offers.
+ if (taskState != TaskState.KILLED) {
+ // Need to revive offers again now that the task set manager state has been updated to
+ // reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}