aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
commit8790ee6d69e50ca84eb849742be48f2476743b5b (patch)
treed99e02c1bc02a92135c7da2f5745e57fd04663af /core/src/test
parent9b214cea896056e7d0a69ae9d3c282e1f027d5b9 (diff)
downloadspark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.gz
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.bz2
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.zip
[SPARK-10622][CORE][YARN] Differentiate dead from "mostly dead" executors.
In YARN mode, when preemption is enabled, we may leave executors in a zombie state while we wait to retrieve the reason for which the executor exited. This is so that we don't account for failed tasks that were running on a preempted executor. The issue is that while we wait for this information, the scheduler might decide to schedule tasks on the executor, which will never be able to run them. Other side effects include the block manager still considering the executor available to cache blocks, for example. So, when we know that an executor went down but we don't know why, stop everything related to the executor, except its running tasks. Only when we know the reason for the exit (or give up waiting for it) we do update the running tasks. This is achieved by a new `disableExecutor()` method in the `Schedulable` interface. For managers that do not behave like this (i.e. every one but YARN), the existing `executorLost()` method will behave the same way it did before. On top of that change, a few minor changes that made debugging easier, and fixed some other minor issues: - The cluster-mode AM was printing a misleading log message every time an executor disconnected from the driver (because the akka actor system was shared between driver and AM). - Avoid sending unnecessary requests for an executor's exit reason when we already know it was explicitly disabled / killed. This avoids both multiple requests, and unnecessary requests that would just cause warning messages on the AM (in the explicit kill case). - Tone down a log message about the executor being lost when it exited normally (e.g. preemption) - Wake up the AM monitor thread when requests for executor loss reasons arrive too, so that we can more quickly remove executors from this zombie state. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #8887 from vanzin/SPARK-10622.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala36
1 files changed, 36 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index c2edd4c317..2afb595e6f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
}
}
+ test("tasks are not re-scheduled while executor loss reason is pending") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val attempt1 = FakeTask.createTaskSet(1)
+
+ // submit attempt 1, offer resources, task gets scheduled
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
+ assert(1 === taskDescriptions.length)
+
+ // mark executor0 as dead but pending fail reason
+ taskScheduler.executorLost("executor0", LossReasonPending)
+
+ // offer some more resources on a different executor, nothing should change
+ val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(0 === taskDescriptions2.length)
+
+ // provide the actual loss reason for executor0
+ taskScheduler.executorLost("executor0", SlaveLost("oops"))
+
+ // executor0's tasks should have failed now that the loss reason is known, so offering more
+ // resources should make them be scheduled on the new executor.
+ val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(1 === taskDescriptions3.length)
+ assert("executor1" === taskDescriptions3(0).executorId)
+ }
+
}