aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-14 13:33:11 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-14 13:55:03 -0800
commitb4546ba9e694529c359b7ca5c26829ead2c07f1a (patch)
tree64478157137a4ed39c4954fbfb05c7ca2cd4cd3b
parent1a4cfbea334c7b0dae287eab4c3131c8f4b8a992 (diff)
downloadspark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.tar.gz
spark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.tar.bz2
spark-b4546ba9e694529c359b7ca5c26829ead2c07f1a.zip
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the task can be rescheduled on a different machine. In the current code, the state in ClusterTaskSetManager indicating which tasks are pending may be updated after revive offers is called (there's a race condition here), so when revive offers is called, the task set manager does not yet realize that there are failed tasks that need to be relaunched.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala13
1 files changed, 3 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 53a589615d..c1e65a3c48 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
- var taskFailed = false
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
taskIdToExecutorId.remove(tid)
}
- if (state == TaskState.FAILED) {
- taskFailed = true
- }
activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
- if (taskFailed) {
- // Also revive offers if a task had failed for some reason other than host lost
- backend.reviveOffers()
- }
}
def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
- if (taskState == TaskState.FINISHED) {
- // The task finished successfully but the result was lost, so we should revive offers.
+ if (taskState != TaskState.KILLED) {
+ // Need to revive offers again now that the task set manager state has been updated to
+ // reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}