From a34096a76de9d07518ce33111ad43b88049c1ac2 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:40:16 -0800 Subject: Add easymock to POMs --- core/pom.xml | 5 +++++ pom.xml | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index 862d3ec37a..a2b9b726a6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -98,6 +98,11 @@ scalacheck_${scala.version} test + + org.easymock + easymock + test + com.novocode junit-interface diff --git a/pom.xml b/pom.xml index 3ea989a082..4a4ff560e7 100644 --- a/pom.xml +++ b/pom.xml @@ -273,6 +273,12 @@ 1.8 test + + org.easymock + easymock + 3.1 + test + org.scalacheck scalacheck_${scala.version} -- cgit v1.2.3 From 0f81025ecadbfd21edb64602658ae8ba26e5bf66 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 29 Jan 2013 18:54:58 -0800 Subject: Add easymock to SBT configuration. --- project/SparkBuild.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 03b8094f7d..af8b5ba017 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -92,7 +92,8 @@ object SparkBuild extends Build { "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", "org.scalatest" %% "scalatest" % "1.8" % "test", "org.scalacheck" %% "scalacheck" % "1.9" % "test", - "com.novocode" % "junit-interface" % "0.8" % "test" + "com.novocode" % "junit-interface" % "0.8" % "test", + "org.easymock" % "easymock" % "3.1" % "test" ), parallelExecution := false, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ -- cgit v1.2.3 From a3d14c0404d6b28433784f84086a29ecc0045a12 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:41:08 -0800 Subject: Refactoring to DAGScheduler to aid testing --- core/src/main/scala/spark/SparkContext.scala | 1 + .../main/scala/spark/scheduler/DAGScheduler.scala | 29 +++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index dc9b8688b3..6ae04f4a44 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -187,6 +187,7 @@ class SparkContext( taskScheduler.start() private var dagScheduler = new DAGScheduler(taskScheduler) + dagScheduler.start() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b130be6a38..9655961162 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -23,7 +23,14 @@ import util.{MetadataCleaner, TimeStampedHashMap} * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ private[spark] -class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { +class DAGScheduler(taskSched: TaskScheduler, + mapOutputTracker: MapOutputTracker, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv) + extends TaskSchedulerListener with Logging { + def this(taskSched: TaskScheduler) { + this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get) + } taskSched.setListener(this) // Called by TaskScheduler to report task completions or failures. @@ -66,10 +73,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with var cacheLocs = new HashMap[Int, Array[List[String]]] - val env = SparkEnv.get - val mapOutputTracker = env.mapOutputTracker - val blockManagerMaster = env.blockManager.master - // For tracking failed nodes, we use the MapOutputTracker's generation number, which is // sent with every task. When we detect a node failing, we note the current generation number // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask @@ -90,12 +93,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) // Start a thread to run the DAGScheduler event loop - new Thread("DAGScheduler") { - setDaemon(true) - override def run() { - DAGScheduler.this.run() - } - }.start() + def start() { + new Thread("DAGScheduler") { + setDaemon(true) + override def run() { + DAGScheduler.this.run() + } + }.start() + } def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { @@ -546,7 +551,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) { failedGeneration(execId) = currentGeneration logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration)) - env.blockManager.master.removeExecutor(execId) + blockManagerMaster.removeExecutor(execId) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) -- cgit v1.2.3 From 9eac7d01f0880d1d3d51e922ef2566c4ee92989f Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:42:35 -0800 Subject: Add DAGScheduler tests. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 540 +++++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala new file mode 100644 index 0000000000..53f5214d7a --- /dev/null +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -0,0 +1,540 @@ +package spark.scheduler + +import scala.collection.mutable.{Map, HashMap} + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.AsyncAssertions +import org.scalatest.concurrent.TimeLimitedTests +import org.scalatest.mock.EasyMockSugar +import org.scalatest.time.{Span, Seconds} + +import org.easymock.EasyMock._ +import org.easymock.EasyMock +import org.easymock.{IAnswer, IArgumentMatcher} + +import akka.actor.ActorSystem + +import spark.storage.BlockManager +import spark.storage.BlockManagerId +import spark.storage.BlockManagerMaster +import spark.{Dependency, ShuffleDependency, OneToOneDependency} +import spark.FetchFailedException +import spark.MapOutputTracker +import spark.RDD +import spark.SparkContext +import spark.SparkException +import spark.Split +import spark.TaskContext +import spark.TaskEndReason + +import spark.{FetchFailed, Success} + +class DAGSchedulerSuite extends FunSuite + with BeforeAndAfter with EasyMockSugar with TimeLimitedTests + with AsyncAssertions with spark.Logging { + + // If we crash the DAGScheduler thread, our test will probably hang. + override val timeLimit = Span(5, Seconds) + + val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") + var scheduler: DAGScheduler = null + var w: Waiter = null + val taskScheduler = mock[TaskScheduler] + val blockManagerMaster = mock[BlockManagerMaster] + var mapOutputTracker: MapOutputTracker = null + var schedulerThread: Thread = null + var schedulerException: Throwable = null + val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] + val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] + + implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) + + def resetExpecting(f: => Unit) { + reset(taskScheduler) + reset(blockManagerMaster) + expecting(f) + } + + before { + taskSetMatchers.clear() + cacheLocations.clear() + val actorSystem = ActorSystem("test") + mapOutputTracker = new MapOutputTracker(actorSystem, true) + resetExpecting { + taskScheduler.setListener(anyObject()) + } + whenExecuting { + scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) + } + w = new Waiter + schedulerException = null + schedulerThread = new Thread("DAGScheduler under test") { + override def run() { + try { + scheduler.run() + } catch { + case t: Throwable => + logError("Got exception in DAGScheduler: ", t) + schedulerException = t + } finally { + w.dismiss() + } + } + } + schedulerThread.start + logInfo("finished before") + } + + after { + logInfo("started after") + resetExpecting { + taskScheduler.stop() + } + whenExecuting { + scheduler.stop + schedulerThread.join + } + w.await() + if (schedulerException != null) { + throw new Exception("Exception caught from scheduler thread", schedulerException) + } + } + + // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + // This is a pair RDD type so it can always be used in ShuffleDependencies. + type MyRDD = RDD[(Int, Int)] + + def makeRdd( + numSplits: Int, + dependencies: List[Dependency[_]], + locations: Seq[Seq[String]] = Nil + ): MyRDD = { + val maxSplit = numSplits - 1 + return new MyRDD(sc, dependencies) { + override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + override def getSplits() = (0 to maxSplit).map(i => new Split { + override def index = i + }).toArray + override def getPreferredLocations(split: Split): Seq[String] = + if (locations.isDefinedAt(split.index)) + locations(split.index) + else + Nil + override def toString: String = "DAGSchedulerSuiteRDD " + id + } + } + + def taskSetForRdd(rdd: MyRDD): TaskSet = { + val matcher = taskSetMatchers.getOrElseUpdate(rdd, + new IArgumentMatcher { + override def matches(actual: Any): Boolean = { + val taskSet = actual.asInstanceOf[TaskSet] + taskSet.tasks(0) match { + case rt: ResultTask[_, _] => rt.rdd.id == rdd.id + case smt: ShuffleMapTask => smt.rdd.id == rdd.id + case _ => false + } + } + override def appendTo(buf: StringBuffer) { + buf.append("taskSetForRdd(" + rdd + ")") + } + }) + EasyMock.reportMatcher(matcher) + return null + } + + def expectGetLocations(): Unit = { + EasyMock.expect(blockManagerMaster.getLocations(anyObject().asInstanceOf[Array[String]])). + andAnswer(new IAnswer[Seq[Seq[BlockManagerId]]] { + override def answer(): Seq[Seq[BlockManagerId]] = { + val blocks = getCurrentArguments()(0).asInstanceOf[Array[String]] + return blocks.map { name => + val pieces = name.split("_") + if (pieces(0) == "rdd") { + val key = pieces(1).toInt -> pieces(2).toInt + if (cacheLocations.contains(key)) { + cacheLocations(key) + } else { + Seq[BlockManagerId]() + } + } else { + Seq[BlockManagerId]() + } + }.toSeq + } + }).anyTimes() + } + + def expectStageAnd(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], + preferredLocations: Option[Seq[Seq[String]]] = None)(afterSubmit: TaskSet => Unit) { + // TODO: Remember which submission + EasyMock.expect(taskScheduler.submitTasks(taskSetForRdd(rdd))).andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + val taskSet = getCurrentArguments()(0).asInstanceOf[TaskSet] + for (task <- taskSet.tasks) { + task.generation = mapOutputTracker.getGeneration + } + afterSubmit(taskSet) + preferredLocations match { + case None => + for (taskLocs <- taskSet.tasks.map(_.preferredLocations)) { + w { assert(taskLocs.size === 0) } + } + case Some(locations) => + w { assert(locations.size === taskSet.tasks.size) } + for ((expectLocs, taskLocs) <- + taskSet.tasks.map(_.preferredLocations).zip(locations)) { + w { assert(expectLocs === taskLocs) } + } + } + w { assert(taskSet.tasks.size >= results.size)} + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + scheduler.taskEnded(taskSet.tasks(i), result._1, result._2, Map[Long, Any]()) + } + } + } + }) + } + + def expectStage(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], + preferredLocations: Option[Seq[Seq[String]]] = None) { + expectStageAnd(rdd, results, preferredLocations) { _ => } + } + + def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): Array[Int] = { + return scheduler.runJob[(Int, Int), Int]( + rdd, + (context: TaskContext, it: Iterator[(Int, Int)]) => it.next._1.asInstanceOf[Int], + (0 to (rdd.splits.size - 1)), + "test-site", + allowLocal + ) + } + + def makeMapStatus(host: String, reduces: Int): MapStatus = + new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2)) + + test("zero split job") { + val rdd = makeRdd(0, Nil) + resetExpecting { + expectGetLocations() + // deliberately expect no stages to be submitted + } + whenExecuting { + assert(submitRdd(rdd) === Array[Int]()) + } + } + + test("run trivial job") { + val rdd = makeRdd(1, Nil) + resetExpecting { + expectGetLocations() + expectStage(rdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(rdd) === Array(42)) + } + } + + test("local job") { + val rdd = new MyRDD(sc, Nil) { + override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + Array(42 -> 0).iterator + override def getSplits() = Array( new Split { override def index = 0 } ) + override def getPreferredLocations(split: Split) = Nil + override def toString = "DAGSchedulerSuite Local RDD" + } + resetExpecting { + expectGetLocations() + // deliberately expect no stages to be submitted + } + whenExecuting { + assert(submitRdd(rdd, true) === Array(42)) + } + } + + test("run trivial job w/ dependency") { + val baseRdd = makeRdd(1, Nil) + val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + resetExpecting { + expectGetLocations() + expectStage(finalRdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("location preferences w/ dependency") { + val baseRdd = makeRdd(1, Nil) + val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + resetExpecting { + expectGetLocations() + cacheLocations(baseRdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + expectStage(finalRdd, List( (Success, 42) ), + Some(List(Seq("hostA", "hostB")))) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("trivial job failure") { + val rdd = makeRdd(1, Nil) + resetExpecting { + expectGetLocations() + expectStageAnd(rdd, List()) { taskSet => scheduler.taskSetFailed(taskSet, "test failure") } + } + whenExecuting(taskScheduler, blockManagerMaster) { + intercept[SparkException] { submitRdd(rdd) } + } + } + + test("run trivial shuffle") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(1, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42)) + } + } + + test("run trivial shuffle with fetch failure") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(2, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(reduceRdd, List( + (Success, 42), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null) + )) + // partial recompute + expectStage(shuffleMapRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStageAnd(reduceRdd, List( (Success, 43) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), + makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42, 43)) + } + } + + test("ignore late map task completions") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(2, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStageAnd(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) { taskSet => + val newGeneration = mapOutputTracker.getGeneration + 1 + scheduler.executorLost("exec-hostA") + val noAccum = Map[Long, Any]() + // We rely on the event queue being ordered and increasing the generation number by 1 + // should be ignored for being too old + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) + // should work because it's a non-failed host + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum) + // should be ignored for being too old + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) + // should be ignored (not end the stage) because it's too old + scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) + taskSet.tasks(1).generation = newGeneration + scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStageAnd(reduceRdd, List( + (Success, 42), (Success, 43) + )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42, 43)) + } + } + + test("run trivial shuffle with out-of-band failure") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(1, List(shuffleDep)) + resetExpecting { + expectGetLocations() + blockManagerMaster.removeExecutor("exec-hostA") + expectStageAnd(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) { _ => scheduler.executorLost("exec-hostA") } + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostC", 1)) + )) + expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), + makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42)) + } + } + + test("recursive shuffle failures") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )) + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // triggers a partial recompute of the first stage, then the second + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) + expectStage(finalRdd, List( + (Success, 42) + )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("cached post-shuffle") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )){ _ => + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't + // immediately try to rerun shuffleOneRdd: + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostD", 1)) + ), Some(Seq(List("hostD")))) + expectStage(finalRdd, List( + (Success, 42) + )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("cached post-shuffle but fails") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )){ _ => + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't + // immediately try to rerun shuffleOneRdd: + expectStageAnd(shuffleTwoRdd, List( + (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) + ), Some(Seq(List("hostD")))) { _ => + w { + intercept[FetchFailedException]{ + mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) + } + } + cacheLocations.remove(shuffleTwoRdd.id -> 0) + } + // after that fetch failure, we should refetch the cache locations and try to recompute + // the whole chain. Note that we will ignore that a fetch failure previously occured on + // this host. + expectStage(shuffleOneRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStage(shuffleTwoRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStage(finalRdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } +} + -- cgit v1.2.3 From 4bf3d7ea1252454ca584a3dabf26bdeab4069409 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 29 Jan 2013 19:05:45 -0800 Subject: Clear spark.master.port to cleanup for other tests --- core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 53f5214d7a..6c577c2685 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -102,6 +102,7 @@ class DAGSchedulerSuite extends FunSuite if (schedulerException != null) { throw new Exception("Exception caught from scheduler thread", schedulerException) } + System.clearProperty("spark.master.port") } // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. -- cgit v1.2.3 From 178b89204c9dbee36886e757ddaafbd079672f4a Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:19:55 -0800 Subject: Refactor DAGScheduler more to allow testing without a separate thread. --- .../main/scala/spark/scheduler/DAGScheduler.scala | 176 +++++++++++++-------- 1 file changed, 111 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9655961162..6892509ed1 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -23,11 +23,13 @@ import util.{MetadataCleaner, TimeStampedHashMap} * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ private[spark] -class DAGScheduler(taskSched: TaskScheduler, - mapOutputTracker: MapOutputTracker, - blockManagerMaster: BlockManagerMaster, - env: SparkEnv) - extends TaskSchedulerListener with Logging { +class DAGScheduler( + taskSched: TaskScheduler, + mapOutputTracker: MapOutputTracker, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv) + extends TaskSchedulerListener with Logging { + def this(taskSched: TaskScheduler) { this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get) } @@ -203,6 +205,27 @@ class DAGScheduler(taskSched: TaskScheduler, missing.toList } + /** Returns (and does not) submit a JobSubmitted event suitable to run a given job, and + * a JobWaiter whose getResult() method will return the result of the job when it is complete. + * + * The job is assumed to have at least one partition; zero partition jobs should be handled + * without a JobSubmitted event. + */ + private[scheduler] def prepareJob[T, U: ClassManifest]( + finalRdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + callSite: String, + allowLocal: Boolean) + : (JobSubmitted, JobWaiter) = + { + assert(partitions.size > 0) + val waiter = new JobWaiter(partitions.size) + val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] + val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter) + return (toSubmit, waiter) + } + def runJob[T, U: ClassManifest]( finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, @@ -214,9 +237,8 @@ class DAGScheduler(taskSched: TaskScheduler, if (partitions.size == 0) { return new Array[U](0) } - val waiter = new JobWaiter(partitions.size) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)) + val (toSubmit, waiter) = prepareJob(finalRdd, func, partitions, callSite, allowLocal) + eventQueue.put(toSubmit) waiter.getResult() match { case JobSucceeded(results: Seq[_]) => return results.asInstanceOf[Seq[U]].toArray @@ -241,6 +263,81 @@ class DAGScheduler(taskSched: TaskScheduler, return listener.getResult() // Will throw an exception if the job fails } + /** Process one event retrieved from the event queue. + * Returns true if we should stop the event loop. + */ + private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { + event match { + case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => + val runId = nextRunId.getAndIncrement() + val finalStage = newStage(finalRDD, None, runId) + val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) + clearCacheLocs() + logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + " output partitions (allowLocal=" + allowLocal + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + // Compute very short actions like first() or take() with no parent stages locally. + runLocally(job) + } else { + activeJobs += job + resultStageToJob(finalStage) = job + submitStage(finalStage) + } + + case ExecutorLost(execId) => + handleExecutorLost(execId) + + case completion: CompletionEvent => + handleTaskCompletion(completion) + + case TaskSetFailed(taskSet, reason) => + abortStage(idToStage(taskSet.stageId), reason) + + case StopDAGScheduler => + // Cancel any active jobs + for (job <- activeJobs) { + val error = new SparkException("Job cancelled because SparkContext was shut down") + job.listener.jobFailed(error) + } + return true + } + return false + } + + /** Resubmit any failed stages. Ordinarily called after a small amount of time has passed since + * the last fetch failure. + */ + private[scheduler] def resubmitFailedStages() { + logInfo("Resubmitting failed stages") + clearCacheLocs() + val failed2 = failed.toArray + failed.clear() + for (stage <- failed2.sortBy(_.priority)) { + submitStage(stage) + } + } + + /** Check for waiting or failed stages which are now eligible for resubmission. + * Ordinarily run on every iteration of the event loop. + */ + private[scheduler] def submitWaitingStages() { + // TODO: We might want to run this less often, when we are sure that something has become + // runnable that wasn't before. + logTrace("Checking for newly runnable parent stages") + logTrace("running: " + running) + logTrace("waiting: " + waiting) + logTrace("failed: " + failed) + val waiting2 = waiting.toArray + waiting.clear() + for (stage <- waiting2.sortBy(_.priority)) { + submitStage(stage) + } + } + + /** * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure * events and responds by launching tasks. This runs in a dedicated thread and receives events @@ -251,77 +348,26 @@ class DAGScheduler(taskSched: TaskScheduler, while (true) { val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS) - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability if (event != null) { logDebug("Got event of type " + event.getClass.getName) } - event match { - case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => - val runId = nextRunId.getAndIncrement() - val finalStage = newStage(finalRDD, None, runId) - val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) - clearCacheLocs() - logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + - " output partitions") - logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { - // Compute very short actions like first() or take() with no parent stages locally. - runLocally(job) - } else { - activeJobs += job - resultStageToJob(finalStage) = job - submitStage(finalStage) - } - - case ExecutorLost(execId) => - handleExecutorLost(execId) - - case completion: CompletionEvent => - handleTaskCompletion(completion) - - case TaskSetFailed(taskSet, reason) => - abortStage(idToStage(taskSet.stageId), reason) - - case StopDAGScheduler => - // Cancel any active jobs - for (job <- activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") - job.listener.jobFailed(error) - } + if (event != null) { + if (processEvent(event)) { return - - case null => - // queue.poll() timed out, ignore it + } } + val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability // Periodically resubmit failed stages if some map output fetches have failed and we have // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at // the same time, so we want to make sure we've identified all the reduce tasks that depend // on the failed node. if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - logInfo("Resubmitting failed stages") - clearCacheLocs() - val failed2 = failed.toArray - failed.clear() - for (stage <- failed2.sortBy(_.priority)) { - submitStage(stage) - } + resubmitFailedStages } else { - // TODO: We might want to run this less often, when we are sure that something has become - // runnable that wasn't before. - logTrace("Checking for newly runnable parent stages") - logTrace("running: " + running) - logTrace("waiting: " + waiting) - logTrace("failed: " + failed) - val waiting2 = waiting.toArray - waiting.clear() - for (stage <- waiting2.sortBy(_.priority)) { - submitStage(stage) - } + submitWaitingStages } } } -- cgit v1.2.3 From 9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:22:07 -0800 Subject: Change DAGSchedulerSuite to run DAGScheduler in the same Thread. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 568 ++++++++++++--------- 1 file changed, 319 insertions(+), 249 deletions(-) diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 6c577c2685..89173540d4 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -4,12 +4,12 @@ import scala.collection.mutable.{Map, HashMap} import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.AsyncAssertions import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.mock.EasyMockSugar import org.scalatest.time.{Span, Seconds} import org.easymock.EasyMock._ +import org.easymock.Capture import org.easymock.EasyMock import org.easymock.{IAnswer, IArgumentMatcher} @@ -30,33 +30,55 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} -class DAGSchedulerSuite extends FunSuite - with BeforeAndAfter with EasyMockSugar with TimeLimitedTests - with AsyncAssertions with spark.Logging { +class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar with TimeLimitedTests { - // If we crash the DAGScheduler thread, our test will probably hang. + // impose a time limit on this test in case we don't let the job finish. override val timeLimit = Span(5, Seconds) val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") var scheduler: DAGScheduler = null - var w: Waiter = null val taskScheduler = mock[TaskScheduler] val blockManagerMaster = mock[BlockManagerMaster] var mapOutputTracker: MapOutputTracker = null var schedulerThread: Thread = null var schedulerException: Throwable = null + + /** Set of EasyMock argument matchers that match a TaskSet for a given RDD. + * We cache these so we do not create duplicate matchers for the same RDD. + * This allows us to easily setup a sequence of expectations for task sets for + * that RDD. + */ val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] + + /** Set of cache locations to return from our mock BlockManagerMaster. + * Keys are (rdd ID, partition ID). Anything not present will return an empty + * list of cache locations silently. + */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] - implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which + * will only submit one job) from needing to explicitly track it. + */ + var lastJobWaiter: JobWaiter = null - def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) + /** Tell EasyMockSugar what mock objects we want to be configured by expecting {...} + * and whenExecuting {...} */ + implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects + * to be reset after each time their expectations are set, and we tend to check mock object + * calls over a single call to DAGScheduler. + * + * We also set a default expectation here that blockManagerMaster.getLocations can be called + * and will return values from cacheLocations. + */ def resetExpecting(f: => Unit) { reset(taskScheduler) reset(blockManagerMaster) - expecting(f) + expecting { + expectGetLocations() + f + } } before { @@ -70,45 +92,30 @@ class DAGSchedulerSuite extends FunSuite whenExecuting { scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) } - w = new Waiter - schedulerException = null - schedulerThread = new Thread("DAGScheduler under test") { - override def run() { - try { - scheduler.run() - } catch { - case t: Throwable => - logError("Got exception in DAGScheduler: ", t) - schedulerException = t - } finally { - w.dismiss() - } - } - } - schedulerThread.start - logInfo("finished before") } after { - logInfo("started after") + assert(scheduler.processEvent(StopDAGScheduler)) resetExpecting { taskScheduler.stop() } whenExecuting { - scheduler.stop - schedulerThread.join - } - w.await() - if (schedulerException != null) { - throw new Exception("Exception caught from scheduler thread", schedulerException) + scheduler.stop() } System.clearProperty("spark.master.port") } - // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. - // This is a pair RDD type so it can always be used in ShuffleDependencies. + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) + + /** Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + * This is a pair RDD type so it can always be used in ShuffleDependencies. */ type MyRDD = RDD[(Int, Int)] + /** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and + * preferredLocations (if any) that are passed to them. They are deliberately not executable + * so we can test that DAGScheduler does not try to execute RDDs locally. + */ def makeRdd( numSplits: Int, dependencies: List[Dependency[_]], @@ -130,6 +137,9 @@ class DAGSchedulerSuite extends FunSuite } } + /** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task + * is from a particular RDD. + */ def taskSetForRdd(rdd: MyRDD): TaskSet = { val matcher = taskSetMatchers.getOrElseUpdate(rdd, new IArgumentMatcher { @@ -149,6 +159,9 @@ class DAGSchedulerSuite extends FunSuite return null } + /** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from + * cacheLocations. + */ def expectGetLocations(): Unit = { EasyMock.expect(blockManagerMaster.getLocations(anyObject().asInstanceOf[Array[String]])). andAnswer(new IAnswer[Seq[Seq[BlockManagerId]]] { @@ -171,51 +184,106 @@ class DAGSchedulerSuite extends FunSuite }).anyTimes() } - def expectStageAnd(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None)(afterSubmit: TaskSet => Unit) { - // TODO: Remember which submission - EasyMock.expect(taskScheduler.submitTasks(taskSetForRdd(rdd))).andAnswer(new IAnswer[Unit] { - override def answer(): Unit = { - val taskSet = getCurrentArguments()(0).asInstanceOf[TaskSet] - for (task <- taskSet.tasks) { - task.generation = mapOutputTracker.getGeneration - } - afterSubmit(taskSet) - preferredLocations match { - case None => - for (taskLocs <- taskSet.tasks.map(_.preferredLocations)) { - w { assert(taskLocs.size === 0) } - } - case Some(locations) => - w { assert(locations.size === taskSet.tasks.size) } - for ((expectLocs, taskLocs) <- - taskSet.tasks.map(_.preferredLocations).zip(locations)) { - w { assert(expectLocs === taskLocs) } - } - } - w { assert(taskSet.tasks.size >= results.size)} - for ((result, i) <- results.zipWithIndex) { - if (i < taskSet.tasks.size) { - scheduler.taskEnded(taskSet.tasks(i), result._1, result._2, Map[Long, Any]()) - } - } + /** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting + * the scheduler not to exit. + * + * After processing the event, submit waiting stages as is done on most iterations of the + * DAGScheduler event loop. + */ + def runEvent(event: DAGSchedulerEvent) { + assert(!scheduler.processEvent(event)) + scheduler.submitWaitingStages() + } + + /** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be + * called from a resetExpecting { ... } block. + * + * Returns a easymock Capture that will contain the task set after the stage is submitted. + * Most tests should use interceptStage() instead of this directly. + */ + def expectStage(rdd: MyRDD): Capture[TaskSet] = { + val taskSetCapture = new Capture[TaskSet] + taskScheduler.submitTasks(and(capture(taskSetCapture), taskSetForRdd(rdd))) + return taskSetCapture + } + + /** Expect the supplied code snippet to submit a stage for the specified RDD. + * Return the resulting TaskSet. First marks all the tasks are belonging to the + * current MapOutputTracker generation. + */ + def interceptStage(rdd: MyRDD)(f: => Unit): TaskSet = { + var capture: Capture[TaskSet] = null + resetExpecting { + capture = expectStage(rdd) + } + whenExecuting { + f + } + val taskSet = capture.getValue + for (task <- taskSet.tasks) { + task.generation = mapOutputTracker.getGeneration + } + return taskSet + } + + /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ + def respondToTaskSet(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any]())) } - }) + } } - def expectStage(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None) { - expectStageAnd(rdd, results, preferredLocations) { _ => } + /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + def expectTaskSetLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { + assert(locations.size === taskSet.tasks.size) + for ((expectLocs, taskLocs) <- + taskSet.tasks.map(_.preferredLocations).zip(locations)) { + assert(expectLocs === taskLocs) + } } - def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): Array[Int] = { - return scheduler.runJob[(Int, Int), Int]( + /** When we submit dummy Jobs, this is the compute function we supply. Except in a local test + * below, we do not expect this function to ever be executed; instead, we will return results + * directly through CompletionEvents. + */ + def jobComputeFunc(context: TaskContext, it: Iterator[(Int, Int)]): Int = + it.next._1.asInstanceOf[Int] + + + /** Start a job to compute the given RDD. Returns the JobWaiter that will + * collect the result of the job via callbacks from DAGScheduler. */ + def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): JobWaiter = { + val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, - (context: TaskContext, it: Iterator[(Int, Int)]) => it.next._1.asInstanceOf[Int], + jobComputeFunc, (0 to (rdd.splits.size - 1)), "test-site", allowLocal ) + lastJobWaiter = waiter + runEvent(toSubmit) + return waiter + } + + /** Assert that a job we started has failed. */ + def expectJobException(waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(_) => fail() + case JobFailed(_) => return + } + } + + /** Assert that a job we started has succeeded and has the given result. */ + def expectJobResult(expected: Array[Int], waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(answer) => + assert(expected === answer.asInstanceOf[Seq[Int]].toArray ) + case JobFailed(_) => + fail() + } } def makeMapStatus(host: String, reduces: Int): MapStatus = @@ -223,24 +291,14 @@ class DAGSchedulerSuite extends FunSuite test("zero split job") { val rdd = makeRdd(0, Nil) - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd) === Array[Int]()) - } + assert(scheduler.runJob(rdd, jobComputeFunc, Seq(), "test-site", false) === Array[Int]()) } test("run trivial job") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStage(rdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(rdd) === Array(42)) - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("local job") { @@ -251,51 +309,34 @@ class DAGSchedulerSuite extends FunSuite override def getPreferredLocations(split: Split) = Nil override def toString = "DAGSchedulerSuite Local RDD" } - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd, true) === Array(42)) - } + submitRdd(rdd, true) + expectJobResult(Array(42)) } test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - expectStage(finalRdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } - test("location preferences w/ dependency") { + test("cache location preferences w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - cacheLocations(baseRdd.id -> 0) = - Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) - expectStage(finalRdd, List( (Success, 42) ), - Some(List(Seq("hostA", "hostB")))) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + cacheLocations(baseRdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + expectTaskSetLocations(taskSet, List(Seq("hostA", "hostB"))) + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("trivial job failure") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStageAnd(rdd, List()) { taskSet => scheduler.taskSetFailed(taskSet, "test failure") } - } - whenExecuting(taskScheduler, blockManagerMaster) { - intercept[SparkException] { submitRdd(rdd) } - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + runEvent(TaskSetFailed(taskSet, "test failure")) + expectJobException() } test("run trivial shuffle") { @@ -304,20 +345,17 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } - } - } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(secondStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("run trivial shuffle with fetch failure") { @@ -326,28 +364,32 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(reduceRdd, List( + } + whenExecuting { + respondToTaskSet(secondStage, List( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null) )) - // partial recompute - expectStage(shuffleMapRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 43) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) + val thirdStage = interceptStage(shuffleMapRdd) { + scheduler.resubmitFailedStages() + } + val fourthStage = interceptStage(reduceRdd) { + respondToTaskSet(thirdStage, List( (Success, makeMapStatus("hostA", 1)) )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(fourthStage, List( (Success, 43) )) + expectJobResult(Array(42, 43)) } test("ignore late map task completions") { @@ -356,63 +398,64 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) + val taskSet = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val oldGeneration = mapOutputTracker.getGeneration resetExpecting { - expectGetLocations() - expectStageAnd(shuffleMapRdd, List( - (Success, makeMapStatus("hostA", 1)) - )) { taskSet => - val newGeneration = mapOutputTracker.getGeneration + 1 - scheduler.executorLost("exec-hostA") - val noAccum = Map[Long, Any]() - // We rely on the event queue being ordered and increasing the generation number by 1 - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should work because it's a non-failed host - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum) - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should be ignored (not end the stage) because it's too old - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - taskSet.tasks(1).generation = newGeneration - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - } blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(reduceRdd, List( - (Success, 42), (Success, 43) - )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) } - } } whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) - } + runEvent(ExecutorLost("exec-hostA")) + } + val newGeneration = mapOutputTracker.getGeneration + assert(newGeneration > oldGeneration) + val noAccum = Map[Long, Any]() + // We rely on the event queue being ordered and increasing the generation number by 1 + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + // should work because it's a non-failed host + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum)) + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + taskSet.tasks(1).generation = newGeneration + val secondStage = interceptStage(reduceRdd) { + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum)) + } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + respondToTaskSet(secondStage, List( (Success, 42), (Success, 43) )) + expectJobResult(Array(42, 43)) } - test("run trivial shuffle with out-of-band failure") { + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) + + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } resetExpecting { - expectGetLocations() blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(shuffleMapRdd, List( + } + whenExecuting { + runEvent(ExecutorLost("exec-hostA")) + } + // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks + // rather than marking it is as failed and waiting. + val secondStage = interceptStage(shuffleMapRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) - )) { _ => scheduler.executorLost("exec-hostA") } - expectStage(shuffleMapRdd, List( - (Success, makeMapStatus("hostC", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostC"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) + val thirdStage = interceptStage(reduceRdd) { + respondToTaskSet(secondStage, List( + (Success, makeMapStatus("hostC", 1)) + )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + respondToTaskSet(thirdStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("recursive shuffle failures") { @@ -422,34 +465,42 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)) + val firstStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + val secondStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val thirdStage = interceptStage(finalRdd) { + respondToTaskSet(secondStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostC", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(thirdStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // triggers a partial recompute of the first stage, then the second - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)) + } + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages + } + val recomputeTwo = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( + (Success, makeMapStatus("hostA", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val finalStage = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( - (Success, 42) - )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) } + respondToTaskSet(finalStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle") { @@ -459,35 +510,41 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStage(shuffleTwoRdd, List( + } + // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. + val recomputeTwo = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwo, Seq(Seq("hostD"))) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostD", 1)) - ), Some(Seq(List("hostD")))) - expectStage(finalRdd, List( - (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle but fails") { @@ -497,45 +554,58 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStageAnd(shuffleTwoRdd, List( - (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) - ), Some(Seq(List("hostD")))) { _ => - w { - intercept[FetchFailedException]{ - mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) - } - } - cacheLocations.remove(shuffleTwoRdd.id -> 0) - } - // after that fetch failure, we should refetch the cache locations and try to recompute - // the whole chain. Note that we will ignore that a fetch failure previously occured on - // this host. - expectStage(shuffleOneRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(shuffleTwoRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) + val recomputeTwoCached = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwoCached, Seq(Seq("hostD"))) + intercept[FetchFailedException]{ + mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) + } + + // Simulate the shuffle input data failing to be cached. + cacheLocations.remove(shuffleTwoRdd.id -> 0) + respondToTaskSet(recomputeTwoCached, List( + (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) + )) + + // After the fetch failure, DAGScheduler should recheck the cache and decide to resubmit + // everything. + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages() } + // We use hostA here to make sure DAGScheduler doesn't think it's still dead. + val recomputeTwoUncached = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( (Success, makeMapStatus("hostA", 1)) )) + } + expectTaskSetLocations(recomputeTwoUncached, Seq(Seq[String]())) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwoUncached, List( (Success, makeMapStatus("hostA", 1)) )) + + } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } } - -- cgit v1.2.3 From 7f51458774ce4561f1df3ba9b68704c3f63852f3 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:34:53 -0800 Subject: Comment at top of DAGSchedulerSuite --- .../test/scala/spark/scheduler/DAGSchedulerSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 89173540d4..c31e2e7064 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -30,9 +30,22 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} +/** + * Tests for DAGScheduler. These tests directly call the event processing functinos in DAGScheduler + * rather than spawning an event loop thread as happens in the real code. They use EasyMock + * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are + * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead + * host notifications are sent). In addition, tests may check for side effects on a non-mocked + * MapOutputTracker instance. + * + * Tests primarily consist of running DAGScheduler#processEvent and + * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet) + * and capturing the resulting TaskSets from the mock TaskScheduler. + */ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar with TimeLimitedTests { - // impose a time limit on this test in case we don't let the job finish. + // impose a time limit on this test in case we don't let the job finish, in which case + // JobWaiter#getResult will hang. override val timeLimit = Span(5, Seconds) val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") -- cgit v1.2.3 From 1fd5ee323d127499bb3f173d4142c37532ec29b2 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Fri, 1 Feb 2013 22:33:38 -0800 Subject: Code review changes: add sc.stop; style of multiline comments; parens on procedure calls. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 69 +++++++++++++++------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index c31e2e7064..adce1f38bb 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -31,7 +31,7 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} /** - * Tests for DAGScheduler. These tests directly call the event processing functinos in DAGScheduler + * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler * rather than spawning an event loop thread as happens in the real code. They use EasyMock * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead @@ -56,29 +56,34 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar var schedulerThread: Thread = null var schedulerException: Throwable = null - /** Set of EasyMock argument matchers that match a TaskSet for a given RDD. + /** + * Set of EasyMock argument matchers that match a TaskSet for a given RDD. * We cache these so we do not create duplicate matchers for the same RDD. * This allows us to easily setup a sequence of expectations for task sets for * that RDD. */ val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] - /** Set of cache locations to return from our mock BlockManagerMaster. + /** + * Set of cache locations to return from our mock BlockManagerMaster. * Keys are (rdd ID, partition ID). Anything not present will return an empty * list of cache locations silently. */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] - /** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which + /** + * JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which * will only submit one job) from needing to explicitly track it. */ var lastJobWaiter: JobWaiter = null - /** Tell EasyMockSugar what mock objects we want to be configured by expecting {...} + /** + * Tell EasyMockSugar what mock objects we want to be configured by expecting {...} * and whenExecuting {...} */ implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) - /** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects + /** + * Utility function to reset mocks and set expectations on them. EasyMock wants mock objects * to be reset after each time their expectations are set, and we tend to check mock object * calls over a single call to DAGScheduler. * @@ -115,17 +120,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar whenExecuting { scheduler.stop() } + sc.stop() System.clearProperty("spark.master.port") } def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) - /** Type of RDD we use for testing. Note that we should never call the real RDD compute methods. - * This is a pair RDD type so it can always be used in ShuffleDependencies. */ + /** + * Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + * This is a pair RDD type so it can always be used in ShuffleDependencies. + */ type MyRDD = RDD[(Int, Int)] - /** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and + /** + * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and * preferredLocations (if any) that are passed to them. They are deliberately not executable * so we can test that DAGScheduler does not try to execute RDDs locally. */ @@ -150,7 +159,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task + /** + * EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task * is from a particular RDD. */ def taskSetForRdd(rdd: MyRDD): TaskSet = { @@ -172,7 +182,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return null } - /** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from + /** + * Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from * cacheLocations. */ def expectGetLocations(): Unit = { @@ -197,7 +208,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar }).anyTimes() } - /** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting + /** + * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting * the scheduler not to exit. * * After processing the event, submit waiting stages as is done on most iterations of the @@ -208,7 +220,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar scheduler.submitWaitingStages() } - /** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be + /** + * Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be * called from a resetExpecting { ... } block. * * Returns a easymock Capture that will contain the task set after the stage is submitted. @@ -220,7 +233,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return taskSetCapture } - /** Expect the supplied code snippet to submit a stage for the specified RDD. + /** + * Expect the supplied code snippet to submit a stage for the specified RDD. * Return the resulting TaskSet. First marks all the tasks are belonging to the * current MapOutputTracker generation. */ @@ -239,7 +253,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return taskSet } - /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ + /** + * Send the given CompletionEvent messages for the tasks in the TaskSet. + */ def respondToTaskSet(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { @@ -249,7 +265,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + /** + * Assert that the supplied TaskSet has exactly the given preferredLocations. + */ def expectTaskSetLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { assert(locations.size === taskSet.tasks.size) for ((expectLocs, taskLocs) <- @@ -258,7 +276,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** When we submit dummy Jobs, this is the compute function we supply. Except in a local test + /** + * When we submit dummy Jobs, this is the compute function we supply. Except in a local test * below, we do not expect this function to ever be executed; instead, we will return results * directly through CompletionEvents. */ @@ -266,8 +285,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar it.next._1.asInstanceOf[Int] - /** Start a job to compute the given RDD. Returns the JobWaiter that will - * collect the result of the job via callbacks from DAGScheduler. */ + /** + * Start a job to compute the given RDD. Returns the JobWaiter that will + * collect the result of the job via callbacks from DAGScheduler. + */ def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): JobWaiter = { val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, @@ -281,7 +302,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return waiter } - /** Assert that a job we started has failed. */ + /** + * Assert that a job we started has failed. + */ def expectJobException(waiter: JobWaiter = lastJobWaiter) { waiter.getResult match { case JobSucceeded(_) => fail() @@ -289,7 +312,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** Assert that a job we started has succeeded and has the given result. */ + /** + * Assert that a job we started has succeeded and has the given result. + */ def expectJobResult(expected: Array[Int], waiter: JobWaiter = lastJobWaiter) { waiter.getResult match { case JobSucceeded(answer) => @@ -500,7 +525,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar )) } val recomputeOne = interceptStage(shuffleOneRdd) { - scheduler.resubmitFailedStages + scheduler.resubmitFailedStages() } val recomputeTwo = interceptStage(shuffleTwoRdd) { respondToTaskSet(recomputeOne, List( -- cgit v1.2.3