diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2013-01-30 09:22:07 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2013-01-30 09:22:07 -0800 |
commit | 9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e (patch) | |
tree | aa4efc17e7ac7bec254dfb0f8058e4723e2ffb07 | |
parent | 178b89204c9dbee36886e757ddaafbd079672f4a (diff) | |
download | spark-9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e.tar.gz spark-9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e.tar.bz2 spark-9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e.zip |
Change DAGSchedulerSuite to run DAGScheduler in the same Thread.
-rw-r--r-- | core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala | 568 |
1 files changed, 319 insertions, 249 deletions
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 6c577c2685..89173540d4 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -4,12 +4,12 @@ import scala.collection.mutable.{Map, HashMap} import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.AsyncAssertions import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.mock.EasyMockSugar import org.scalatest.time.{Span, Seconds} import org.easymock.EasyMock._ +import org.easymock.Capture import org.easymock.EasyMock import org.easymock.{IAnswer, IArgumentMatcher} @@ -30,33 +30,55 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} -class DAGSchedulerSuite extends FunSuite - with BeforeAndAfter with EasyMockSugar with TimeLimitedTests - with AsyncAssertions with spark.Logging { +class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar with TimeLimitedTests { - // If we crash the DAGScheduler thread, our test will probably hang. + // impose a time limit on this test in case we don't let the job finish. override val timeLimit = Span(5, Seconds) val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") var scheduler: DAGScheduler = null - var w: Waiter = null val taskScheduler = mock[TaskScheduler] val blockManagerMaster = mock[BlockManagerMaster] var mapOutputTracker: MapOutputTracker = null var schedulerThread: Thread = null var schedulerException: Throwable = null + + /** Set of EasyMock argument matchers that match a TaskSet for a given RDD. + * We cache these so we do not create duplicate matchers for the same RDD. + * This allows us to easily setup a sequence of expectations for task sets for + * that RDD. + */ val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] + + /** Set of cache locations to return from our mock BlockManagerMaster. + * Keys are (rdd ID, partition ID). Anything not present will return an empty + * list of cache locations silently. + */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] - implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which + * will only submit one job) from needing to explicitly track it. + */ + var lastJobWaiter: JobWaiter = null - def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) + /** Tell EasyMockSugar what mock objects we want to be configured by expecting {...} + * and whenExecuting {...} */ + implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects + * to be reset after each time their expectations are set, and we tend to check mock object + * calls over a single call to DAGScheduler. + * + * We also set a default expectation here that blockManagerMaster.getLocations can be called + * and will return values from cacheLocations. + */ def resetExpecting(f: => Unit) { reset(taskScheduler) reset(blockManagerMaster) - expecting(f) + expecting { + expectGetLocations() + f + } } before { @@ -70,45 +92,30 @@ class DAGSchedulerSuite extends FunSuite whenExecuting { scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) } - w = new Waiter - schedulerException = null - schedulerThread = new Thread("DAGScheduler under test") { - override def run() { - try { - scheduler.run() - } catch { - case t: Throwable => - logError("Got exception in DAGScheduler: ", t) - schedulerException = t - } finally { - w.dismiss() - } - } - } - schedulerThread.start - logInfo("finished before") } after { - logInfo("started after") + assert(scheduler.processEvent(StopDAGScheduler)) resetExpecting { taskScheduler.stop() } whenExecuting { - scheduler.stop - schedulerThread.join - } - w.await() - if (schedulerException != null) { - throw new Exception("Exception caught from scheduler thread", schedulerException) + scheduler.stop() } System.clearProperty("spark.master.port") } - // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. - // This is a pair RDD type so it can always be used in ShuffleDependencies. + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) + + /** Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + * This is a pair RDD type so it can always be used in ShuffleDependencies. */ type MyRDD = RDD[(Int, Int)] + /** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and + * preferredLocations (if any) that are passed to them. They are deliberately not executable + * so we can test that DAGScheduler does not try to execute RDDs locally. + */ def makeRdd( numSplits: Int, dependencies: List[Dependency[_]], @@ -130,6 +137,9 @@ class DAGSchedulerSuite extends FunSuite } } + /** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task + * is from a particular RDD. + */ def taskSetForRdd(rdd: MyRDD): TaskSet = { val matcher = taskSetMatchers.getOrElseUpdate(rdd, new IArgumentMatcher { @@ -149,6 +159,9 @@ class DAGSchedulerSuite extends FunSuite return null } + /** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from + * cacheLocations. + */ def expectGetLocations(): Unit = { EasyMock.expect(blockManagerMaster.getLocations(anyObject().asInstanceOf[Array[String]])). andAnswer(new IAnswer[Seq[Seq[BlockManagerId]]] { @@ -171,51 +184,106 @@ class DAGSchedulerSuite extends FunSuite }).anyTimes() } - def expectStageAnd(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None)(afterSubmit: TaskSet => Unit) { - // TODO: Remember which submission - EasyMock.expect(taskScheduler.submitTasks(taskSetForRdd(rdd))).andAnswer(new IAnswer[Unit] { - override def answer(): Unit = { - val taskSet = getCurrentArguments()(0).asInstanceOf[TaskSet] - for (task <- taskSet.tasks) { - task.generation = mapOutputTracker.getGeneration - } - afterSubmit(taskSet) - preferredLocations match { - case None => - for (taskLocs <- taskSet.tasks.map(_.preferredLocations)) { - w { assert(taskLocs.size === 0) } - } - case Some(locations) => - w { assert(locations.size === taskSet.tasks.size) } - for ((expectLocs, taskLocs) <- - taskSet.tasks.map(_.preferredLocations).zip(locations)) { - w { assert(expectLocs === taskLocs) } - } - } - w { assert(taskSet.tasks.size >= results.size)} - for ((result, i) <- results.zipWithIndex) { - if (i < taskSet.tasks.size) { - scheduler.taskEnded(taskSet.tasks(i), result._1, result._2, Map[Long, Any]()) - } - } + /** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting + * the scheduler not to exit. + * + * After processing the event, submit waiting stages as is done on most iterations of the + * DAGScheduler event loop. + */ + def runEvent(event: DAGSchedulerEvent) { + assert(!scheduler.processEvent(event)) + scheduler.submitWaitingStages() + } + + /** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be + * called from a resetExpecting { ... } block. + * + * Returns a easymock Capture that will contain the task set after the stage is submitted. + * Most tests should use interceptStage() instead of this directly. + */ + def expectStage(rdd: MyRDD): Capture[TaskSet] = { + val taskSetCapture = new Capture[TaskSet] + taskScheduler.submitTasks(and(capture(taskSetCapture), taskSetForRdd(rdd))) + return taskSetCapture + } + + /** Expect the supplied code snippet to submit a stage for the specified RDD. + * Return the resulting TaskSet. First marks all the tasks are belonging to the + * current MapOutputTracker generation. + */ + def interceptStage(rdd: MyRDD)(f: => Unit): TaskSet = { + var capture: Capture[TaskSet] = null + resetExpecting { + capture = expectStage(rdd) + } + whenExecuting { + f + } + val taskSet = capture.getValue + for (task <- taskSet.tasks) { + task.generation = mapOutputTracker.getGeneration + } + return taskSet + } + + /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ + def respondToTaskSet(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any]())) } - }) + } } - def expectStage(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None) { - expectStageAnd(rdd, results, preferredLocations) { _ => } + /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + def expectTaskSetLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { + assert(locations.size === taskSet.tasks.size) + for ((expectLocs, taskLocs) <- + taskSet.tasks.map(_.preferredLocations).zip(locations)) { + assert(expectLocs === taskLocs) + } } - def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): Array[Int] = { - return scheduler.runJob[(Int, Int), Int]( + /** When we submit dummy Jobs, this is the compute function we supply. Except in a local test + * below, we do not expect this function to ever be executed; instead, we will return results + * directly through CompletionEvents. + */ + def jobComputeFunc(context: TaskContext, it: Iterator[(Int, Int)]): Int = + it.next._1.asInstanceOf[Int] + + + /** Start a job to compute the given RDD. Returns the JobWaiter that will + * collect the result of the job via callbacks from DAGScheduler. */ + def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): JobWaiter = { + val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, - (context: TaskContext, it: Iterator[(Int, Int)]) => it.next._1.asInstanceOf[Int], + jobComputeFunc, (0 to (rdd.splits.size - 1)), "test-site", allowLocal ) + lastJobWaiter = waiter + runEvent(toSubmit) + return waiter + } + + /** Assert that a job we started has failed. */ + def expectJobException(waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(_) => fail() + case JobFailed(_) => return + } + } + + /** Assert that a job we started has succeeded and has the given result. */ + def expectJobResult(expected: Array[Int], waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(answer) => + assert(expected === answer.asInstanceOf[Seq[Int]].toArray ) + case JobFailed(_) => + fail() + } } def makeMapStatus(host: String, reduces: Int): MapStatus = @@ -223,24 +291,14 @@ class DAGSchedulerSuite extends FunSuite test("zero split job") { val rdd = makeRdd(0, Nil) - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd) === Array[Int]()) - } + assert(scheduler.runJob(rdd, jobComputeFunc, Seq(), "test-site", false) === Array[Int]()) } test("run trivial job") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStage(rdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(rdd) === Array(42)) - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("local job") { @@ -251,51 +309,34 @@ class DAGSchedulerSuite extends FunSuite override def getPreferredLocations(split: Split) = Nil override def toString = "DAGSchedulerSuite Local RDD" } - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd, true) === Array(42)) - } + submitRdd(rdd, true) + expectJobResult(Array(42)) } test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - expectStage(finalRdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } - test("location preferences w/ dependency") { + test("cache location preferences w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - cacheLocations(baseRdd.id -> 0) = - Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) - expectStage(finalRdd, List( (Success, 42) ), - Some(List(Seq("hostA", "hostB")))) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + cacheLocations(baseRdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + expectTaskSetLocations(taskSet, List(Seq("hostA", "hostB"))) + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("trivial job failure") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStageAnd(rdd, List()) { taskSet => scheduler.taskSetFailed(taskSet, "test failure") } - } - whenExecuting(taskScheduler, blockManagerMaster) { - intercept[SparkException] { submitRdd(rdd) } - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + runEvent(TaskSetFailed(taskSet, "test failure")) + expectJobException() } test("run trivial shuffle") { @@ -304,20 +345,17 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } - } - } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(secondStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("run trivial shuffle with fetch failure") { @@ -326,28 +364,32 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(reduceRdd, List( + } + whenExecuting { + respondToTaskSet(secondStage, List( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null) )) - // partial recompute - expectStage(shuffleMapRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 43) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) + val thirdStage = interceptStage(shuffleMapRdd) { + scheduler.resubmitFailedStages() + } + val fourthStage = interceptStage(reduceRdd) { + respondToTaskSet(thirdStage, List( (Success, makeMapStatus("hostA", 1)) )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(fourthStage, List( (Success, 43) )) + expectJobResult(Array(42, 43)) } test("ignore late map task completions") { @@ -356,63 +398,64 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) + val taskSet = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val oldGeneration = mapOutputTracker.getGeneration resetExpecting { - expectGetLocations() - expectStageAnd(shuffleMapRdd, List( - (Success, makeMapStatus("hostA", 1)) - )) { taskSet => - val newGeneration = mapOutputTracker.getGeneration + 1 - scheduler.executorLost("exec-hostA") - val noAccum = Map[Long, Any]() - // We rely on the event queue being ordered and increasing the generation number by 1 - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should work because it's a non-failed host - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum) - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should be ignored (not end the stage) because it's too old - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - taskSet.tasks(1).generation = newGeneration - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - } blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(reduceRdd, List( - (Success, 42), (Success, 43) - )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) } - } } whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) - } + runEvent(ExecutorLost("exec-hostA")) + } + val newGeneration = mapOutputTracker.getGeneration + assert(newGeneration > oldGeneration) + val noAccum = Map[Long, Any]() + // We rely on the event queue being ordered and increasing the generation number by 1 + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + // should work because it's a non-failed host + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum)) + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + taskSet.tasks(1).generation = newGeneration + val secondStage = interceptStage(reduceRdd) { + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum)) + } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + respondToTaskSet(secondStage, List( (Success, 42), (Success, 43) )) + expectJobResult(Array(42, 43)) } - test("run trivial shuffle with out-of-band failure") { + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) + + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } resetExpecting { - expectGetLocations() blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(shuffleMapRdd, List( + } + whenExecuting { + runEvent(ExecutorLost("exec-hostA")) + } + // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks + // rather than marking it is as failed and waiting. + val secondStage = interceptStage(shuffleMapRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) - )) { _ => scheduler.executorLost("exec-hostA") } - expectStage(shuffleMapRdd, List( - (Success, makeMapStatus("hostC", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostC"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) + val thirdStage = interceptStage(reduceRdd) { + respondToTaskSet(secondStage, List( + (Success, makeMapStatus("hostC", 1)) + )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + respondToTaskSet(thirdStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("recursive shuffle failures") { @@ -422,34 +465,42 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)) + val firstStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + val secondStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val thirdStage = interceptStage(finalRdd) { + respondToTaskSet(secondStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostC", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(thirdStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // triggers a partial recompute of the first stage, then the second - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)) + } + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages + } + val recomputeTwo = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( + (Success, makeMapStatus("hostA", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val finalStage = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( - (Success, 42) - )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) } + respondToTaskSet(finalStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle") { @@ -459,35 +510,41 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStage(shuffleTwoRdd, List( + } + // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. + val recomputeTwo = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwo, Seq(Seq("hostD"))) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostD", 1)) - ), Some(Seq(List("hostD")))) - expectStage(finalRdd, List( - (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle but fails") { @@ -497,45 +554,58 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStageAnd(shuffleTwoRdd, List( - (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) - ), Some(Seq(List("hostD")))) { _ => - w { - intercept[FetchFailedException]{ - mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) - } - } - cacheLocations.remove(shuffleTwoRdd.id -> 0) - } - // after that fetch failure, we should refetch the cache locations and try to recompute - // the whole chain. Note that we will ignore that a fetch failure previously occured on - // this host. - expectStage(shuffleOneRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(shuffleTwoRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) + val recomputeTwoCached = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwoCached, Seq(Seq("hostD"))) + intercept[FetchFailedException]{ + mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) + } + + // Simulate the shuffle input data failing to be cached. + cacheLocations.remove(shuffleTwoRdd.id -> 0) + respondToTaskSet(recomputeTwoCached, List( + (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) + )) + + // After the fetch failure, DAGScheduler should recheck the cache and decide to resubmit + // everything. + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages() } + // We use hostA here to make sure DAGScheduler doesn't think it's still dead. + val recomputeTwoUncached = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( (Success, makeMapStatus("hostA", 1)) )) + } + expectTaskSetLocations(recomputeTwoUncached, Seq(Seq[String]())) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwoUncached, List( (Success, makeMapStatus("hostA", 1)) )) + + } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } } - |