aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
1 files changed, 49 insertions, 9 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c735220da2..8eaf9dfcf4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1569,24 +1569,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
- test("run trivial shuffle with out-of-band failure and retry") {
+ /**
+ * In this test, we run a map stage where one of the executors fails but we still receive a
+ * "zombie" complete message from a task that ran on that executor. We want to make sure the
+ * stage is resubmitted so that the task that ran on the failed executor is re-executed, and
+ * that the stage is only marked as finished once that task completes.
+ */
+ test("run trivial shuffle with out-of-band executor failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
- // blockManagerMaster.removeExecutor("exec-hostA")
- // pretend we were told hostA went away
+ // Tell the DAGScheduler that hostA was lost.
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
- // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
- // rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
+
+ // At this point, no more tasks are running for the stage (and the TaskSetManager considers the
+ // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler
+ // should re-submit the stage with one task (the task that originally ran on HostA).
+ assert(taskSets.size === 2)
+ assert(taskSets(1).tasks.size === 1)
+
+ // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce
+ // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on
+ // alive executors).
+ assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
+
// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+ // Make sure that the reduce stage was now submitted.
+ assert(taskSets.size === 3)
+ assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]])
+
+ // Complete the reduce stage.
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
@@ -2031,6 +2052,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
* as done until all tasks have completed.
+ *
+ * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band
+ * executor failure and retry". However, that test uses ShuffleMapStages that are followed by
+ * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a
+ * ResultStage after it.
*/
test("map stage submission with executor failure late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
@@ -2042,7 +2068,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
- // Pretend host A was lost
+ // Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it
+ // completed on hostA.
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
@@ -2054,13 +2081,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// A completion from another task should work because it's a non-failed host
runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
- assert(results.size === 0) // Map stage job should not be complete yet
+
+ // At this point, no more tasks are running for the stage (and the TaskSetManager considers
+ // the stage complete), but the task that ran on hostA needs to be re-run, so the map stage
+ // shouldn't be marked as complete, and the DAGScheduler should re-submit the stage.
+ assert(results.size === 0)
+ assert(taskSets.size === 2)
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
- assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA
+ // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA).
+ assert(newTaskSet.tasks.size === 2)
+ // Complete task 0 from the original task set (i.e., not hte one that's currently active).
+ // This should still be counted towards the job being complete (but there's still one
+ // outstanding task).
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
- assert(results.size === 0) // Map stage job should not be complete yet
+ assert(results.size === 0)
+
+ // Complete the final task, from the currently active task set. There's still one
+ // running task, task 0 in the currently active stage attempt, but the success of task 0 means
+ // the DAGScheduler can mark the stage as finished.
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()