aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-02-24 11:42:45 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-24 11:42:45 -0800
commit5cbd3b59ba6735c59153416fa15721af6da09acf (patch)
tree5e2de952824e66e8d6833800bc28d852f3301903
parent69d0da6373979ce5b2bcd52933b5a7660d893e88 (diff)
downloadspark-5cbd3b59ba6735c59153416fa15721af6da09acf.tar.gz
spark-5cbd3b59ba6735c59153416fa15721af6da09acf.tar.bz2
spark-5cbd3b59ba6735c59153416fa15721af6da09acf.zip
[SPARK-19560] Improve DAGScheduler tests.
This commit improves the tests that check the case when a ShuffleMapTask completes successfully on an executor that has failed. This commit improves the commenting around the existing test for this, and adds some additional checks to make it more clear what went wrong if the tests fail (the fact that these tests are hard to understand came up in the context of markhamstra's proposed fix for #16620). This commit also removes a test that I realized tested exactly the same functionality. markhamstra, I verified that the new version of the test still fails (and in a more helpful way) for your proposed change for #16620. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16892 from kayousterhout/SPARK-19560.
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
1 files changed, 49 insertions, 9 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c735220da2..8eaf9dfcf4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1569,24 +1569,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
- test("run trivial shuffle with out-of-band failure and retry") {
+ /**
+ * In this test, we run a map stage where one of the executors fails but we still receive a
+ * "zombie" complete message from a task that ran on that executor. We want to make sure the
+ * stage is resubmitted so that the task that ran on the failed executor is re-executed, and
+ * that the stage is only marked as finished once that task completes.
+ */
+ test("run trivial shuffle with out-of-band executor failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
- // blockManagerMaster.removeExecutor("exec-hostA")
- // pretend we were told hostA went away
+ // Tell the DAGScheduler that hostA was lost.
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
- // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
- // rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
+
+ // At this point, no more tasks are running for the stage (and the TaskSetManager considers the
+ // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler
+ // should re-submit the stage with one task (the task that originally ran on HostA).
+ assert(taskSets.size === 2)
+ assert(taskSets(1).tasks.size === 1)
+
+ // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce
+ // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on
+ // alive executors).
+ assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
+
// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+ // Make sure that the reduce stage was now submitted.
+ assert(taskSets.size === 3)
+ assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]])
+
+ // Complete the reduce stage.
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
@@ -2031,6 +2052,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
* as done until all tasks have completed.
+ *
+ * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band
+ * executor failure and retry". However, that test uses ShuffleMapStages that are followed by
+ * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a
+ * ResultStage after it.
*/
test("map stage submission with executor failure late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
@@ -2042,7 +2068,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
- // Pretend host A was lost
+ // Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it
+ // completed on hostA.
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
@@ -2054,13 +2081,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// A completion from another task should work because it's a non-failed host
runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
- assert(results.size === 0) // Map stage job should not be complete yet
+
+ // At this point, no more tasks are running for the stage (and the TaskSetManager considers
+ // the stage complete), but the task that ran on hostA needs to be re-run, so the map stage
+ // shouldn't be marked as complete, and the DAGScheduler should re-submit the stage.
+ assert(results.size === 0)
+ assert(taskSets.size === 2)
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
- assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA
+ // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA).
+ assert(newTaskSet.tasks.size === 2)
+ // Complete task 0 from the original task set (i.e., not hte one that's currently active).
+ // This should still be counted towards the job being complete (but there's still one
+ // outstanding task).
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
- assert(results.size === 0) // Map stage job should not be complete yet
+ assert(results.size === 0)
+
+ // Complete the final task, from the currently active task set. There's still one
+ // running task, task 0 in the currently active stage attempt, but the success of task 0 means
+ // the DAGScheduler can mark the stage as finished.
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()