From 74d3b23929758328c2a7879381669d81bf899396 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 15 Jan 2013 14:03:28 -0600 Subject: Add spark.executor.memory to differentiate executor memory from spark-shell memory. --- core/src/main/scala/spark/SparkContext.scala | 4 ++-- core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 3 +-- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 11 +++++------ 3 files changed, 8 insertions(+), 10 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bbf8272eb3..a5a1b75944 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -111,8 +111,8 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() - for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", - "SPARK_TESTING")) { + // Note: SPARK_MEM isn't included because it's set directly in ExecutorRunner + for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) { val value = System.getenv(key) if (value != null) { executorEnvs(key) = value diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index beceb55ecd..2f2ea617ff 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -118,8 +118,7 @@ private[spark] class ExecutorRunner( for ((key, value) <- jobDesc.command.environment) { env.put(key, value) } - env.put("SPARK_CORES", cores.toString) - env.put("SPARK_MEMORY", memory.toString) + env.put("SPARK_MEM", memory.toString) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index e2301347e5..f2fb244b24 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -23,12 +23,11 @@ private[spark] class SparkDeploySchedulerBackend( // Memory used by each executor (in megabytes) val executorMemory = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + Option(System.getProperty("spark.executor.memory")) + .orElse(Option(System.getenv("SPARK_MEM"))) + .map(Utils.memoryStringToMb) + .getOrElse(512) } override def start() { -- cgit v1.2.3 From 2437f6741b9c5b0a778d55d324aabdc4642889e5 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 22 Jan 2013 18:01:03 -0600 Subject: Restore SPARK_MEM in executorEnvs. --- core/src/main/scala/spark/SparkContext.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index a5a1b75944..402355bd52 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -111,8 +111,9 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() - // Note: SPARK_MEM isn't included because it's set directly in ExecutorRunner - for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) { + // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner + for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", + "SPARK_TESTING")) { val value = System.getenv(key) if (value != null) { executorEnvs(key) = value -- cgit v1.2.3 From fdec42385a1a8f10f9dd803525cb3c132a25ba53 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 22 Jan 2013 18:01:12 -0600 Subject: Fix SPARK_MEM in ExecutorRunner. --- core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 2f2ea617ff..e910416235 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -118,7 +118,7 @@ private[spark] class ExecutorRunner( for ((key, value) <- jobDesc.command.environment) { env.put(key, value) } - env.put("SPARK_MEM", memory.toString) + env.put("SPARK_MEM", memory.toString + "m") // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") -- cgit v1.2.3 From a423ee546c389b5ce0d2117299456712370d7ad1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 22 Jan 2013 18:48:43 -0800 Subject: expose RDD & storage info directly via SparkContext --- core/src/main/scala/spark/SparkContext.scala | 16 +++++++++ .../scala/spark/storage/BlockManagerMaster.scala | 4 +++ .../main/scala/spark/storage/BlockManagerUI.scala | 39 +++++++++------------- .../main/scala/spark/storage/StorageUtils.scala | 10 +++--- 4 files changed, 41 insertions(+), 28 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77036c1275..be992250a9 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import storage.BlockManagerUI import util.{MetadataCleaner, TimeStampedHashMap} +import storage.{StorageStatus, StorageUtils, RDDInfo} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -473,6 +474,21 @@ class SparkContext( } } + /** + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + def getRDDStorageInfo : Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this) + } + + /** + * Return information about blocks stored in all of the slaves + */ + def getSlavesStorageStatus : Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 55ff1dde9c..c7ee76f0b7 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -118,6 +118,10 @@ private[spark] class BlockManagerMaster( askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } + def getStorageStatus: Array[StorageStatus] = { + askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + } + /** Stop the master actor, called only on the Spark master node */ def stop() { if (masterActor != null) { diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index eda320fa47..52f6d1b657 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -1,13 +1,10 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask import akka.util.Timeout import akka.util.duration._ -import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import cc.spray.Directives -import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - // Calculate macro-level statistics - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index. - render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) - } + val storageStatusList = sc.getSlavesStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } } ~ path("rdd") { parameter("id") { id => completeWith { - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - val prefix = "rdd_" + id.toString - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) - } + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getSlavesStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) } } } ~ diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index a10e3a95c6..d6e33c8619 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -56,9 +56,11 @@ object StorageUtils { // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt // Get the friendly name for the rdd, if available. - val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) - val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel - + val rdd = sc.persistentRdds(rddId) + val rddName = Option(rdd.name).getOrElse(rddKey) + val rddStorageLevel = rdd.getStorageLevel + //TODO get total number of partitions in rdd + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray } @@ -75,4 +77,4 @@ object StorageUtils { } -} \ No newline at end of file +} -- cgit v1.2.3 From 0f22c4207f27bc8d1675af82f873141dda754f5c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 28 Jan 2013 10:08:59 -0800 Subject: better formatting for RDDInfo --- core/src/main/scala/spark/storage/StorageUtils.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index d6e33c8619..ce7c067eea 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -1,6 +1,6 @@ package spark.storage -import spark.SparkContext +import spark.{Utils, SparkContext} import BlockManagerMasterActor.BlockStatus private[spark] @@ -22,8 +22,14 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) - + numPartitions: Int, memSize: Long, diskSize: Long) { + override def toString = { + import Utils.memoryBytesToString + import java.lang.{Integer => JInt} + String.format("RDD \"%s\" (%d) Storage: %s; Partitions: %d; MemorySize: %s; DiskSize: %s", name, id.asInstanceOf[JInt], + storageLevel.toString, numPartitions.asInstanceOf[JInt], memoryBytesToString(memSize), memoryBytesToString(diskSize)) + } +} /* Helper methods for storage-related objects */ private[spark] -- cgit v1.2.3 From a34096a76de9d07518ce33111ad43b88049c1ac2 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:40:16 -0800 Subject: Add easymock to POMs --- core/pom.xml | 5 +++++ pom.xml | 6 ++++++ 2 files changed, 11 insertions(+) (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index 862d3ec37a..a2b9b726a6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -98,6 +98,11 @@ scalacheck_${scala.version} test + + org.easymock + easymock + test + com.novocode junit-interface diff --git a/pom.xml b/pom.xml index 3ea989a082..4a4ff560e7 100644 --- a/pom.xml +++ b/pom.xml @@ -273,6 +273,12 @@ 1.8 test + + org.easymock + easymock + 3.1 + test + org.scalacheck scalacheck_${scala.version} -- cgit v1.2.3 From a3d14c0404d6b28433784f84086a29ecc0045a12 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:41:08 -0800 Subject: Refactoring to DAGScheduler to aid testing --- core/src/main/scala/spark/SparkContext.scala | 1 + .../main/scala/spark/scheduler/DAGScheduler.scala | 29 +++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index dc9b8688b3..6ae04f4a44 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -187,6 +187,7 @@ class SparkContext( taskScheduler.start() private var dagScheduler = new DAGScheduler(taskScheduler) + dagScheduler.start() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b130be6a38..9655961162 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -23,7 +23,14 @@ import util.{MetadataCleaner, TimeStampedHashMap} * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ private[spark] -class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { +class DAGScheduler(taskSched: TaskScheduler, + mapOutputTracker: MapOutputTracker, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv) + extends TaskSchedulerListener with Logging { + def this(taskSched: TaskScheduler) { + this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get) + } taskSched.setListener(this) // Called by TaskScheduler to report task completions or failures. @@ -66,10 +73,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with var cacheLocs = new HashMap[Int, Array[List[String]]] - val env = SparkEnv.get - val mapOutputTracker = env.mapOutputTracker - val blockManagerMaster = env.blockManager.master - // For tracking failed nodes, we use the MapOutputTracker's generation number, which is // sent with every task. When we detect a node failing, we note the current generation number // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask @@ -90,12 +93,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) // Start a thread to run the DAGScheduler event loop - new Thread("DAGScheduler") { - setDaemon(true) - override def run() { - DAGScheduler.this.run() - } - }.start() + def start() { + new Thread("DAGScheduler") { + setDaemon(true) + override def run() { + DAGScheduler.this.run() + } + }.start() + } def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { @@ -546,7 +551,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) { failedGeneration(execId) = currentGeneration logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration)) - env.blockManager.master.removeExecutor(execId) + blockManagerMaster.removeExecutor(execId) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) -- cgit v1.2.3 From 9eac7d01f0880d1d3d51e922ef2566c4ee92989f Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 28 Jan 2013 22:42:35 -0800 Subject: Add DAGScheduler tests. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 540 +++++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala (limited to 'core') diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala new file mode 100644 index 0000000000..53f5214d7a --- /dev/null +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -0,0 +1,540 @@ +package spark.scheduler + +import scala.collection.mutable.{Map, HashMap} + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.AsyncAssertions +import org.scalatest.concurrent.TimeLimitedTests +import org.scalatest.mock.EasyMockSugar +import org.scalatest.time.{Span, Seconds} + +import org.easymock.EasyMock._ +import org.easymock.EasyMock +import org.easymock.{IAnswer, IArgumentMatcher} + +import akka.actor.ActorSystem + +import spark.storage.BlockManager +import spark.storage.BlockManagerId +import spark.storage.BlockManagerMaster +import spark.{Dependency, ShuffleDependency, OneToOneDependency} +import spark.FetchFailedException +import spark.MapOutputTracker +import spark.RDD +import spark.SparkContext +import spark.SparkException +import spark.Split +import spark.TaskContext +import spark.TaskEndReason + +import spark.{FetchFailed, Success} + +class DAGSchedulerSuite extends FunSuite + with BeforeAndAfter with EasyMockSugar with TimeLimitedTests + with AsyncAssertions with spark.Logging { + + // If we crash the DAGScheduler thread, our test will probably hang. + override val timeLimit = Span(5, Seconds) + + val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") + var scheduler: DAGScheduler = null + var w: Waiter = null + val taskScheduler = mock[TaskScheduler] + val blockManagerMaster = mock[BlockManagerMaster] + var mapOutputTracker: MapOutputTracker = null + var schedulerThread: Thread = null + var schedulerException: Throwable = null + val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] + val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] + + implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) + + def resetExpecting(f: => Unit) { + reset(taskScheduler) + reset(blockManagerMaster) + expecting(f) + } + + before { + taskSetMatchers.clear() + cacheLocations.clear() + val actorSystem = ActorSystem("test") + mapOutputTracker = new MapOutputTracker(actorSystem, true) + resetExpecting { + taskScheduler.setListener(anyObject()) + } + whenExecuting { + scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) + } + w = new Waiter + schedulerException = null + schedulerThread = new Thread("DAGScheduler under test") { + override def run() { + try { + scheduler.run() + } catch { + case t: Throwable => + logError("Got exception in DAGScheduler: ", t) + schedulerException = t + } finally { + w.dismiss() + } + } + } + schedulerThread.start + logInfo("finished before") + } + + after { + logInfo("started after") + resetExpecting { + taskScheduler.stop() + } + whenExecuting { + scheduler.stop + schedulerThread.join + } + w.await() + if (schedulerException != null) { + throw new Exception("Exception caught from scheduler thread", schedulerException) + } + } + + // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + // This is a pair RDD type so it can always be used in ShuffleDependencies. + type MyRDD = RDD[(Int, Int)] + + def makeRdd( + numSplits: Int, + dependencies: List[Dependency[_]], + locations: Seq[Seq[String]] = Nil + ): MyRDD = { + val maxSplit = numSplits - 1 + return new MyRDD(sc, dependencies) { + override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + override def getSplits() = (0 to maxSplit).map(i => new Split { + override def index = i + }).toArray + override def getPreferredLocations(split: Split): Seq[String] = + if (locations.isDefinedAt(split.index)) + locations(split.index) + else + Nil + override def toString: String = "DAGSchedulerSuiteRDD " + id + } + } + + def taskSetForRdd(rdd: MyRDD): TaskSet = { + val matcher = taskSetMatchers.getOrElseUpdate(rdd, + new IArgumentMatcher { + override def matches(actual: Any): Boolean = { + val taskSet = actual.asInstanceOf[TaskSet] + taskSet.tasks(0) match { + case rt: ResultTask[_, _] => rt.rdd.id == rdd.id + case smt: ShuffleMapTask => smt.rdd.id == rdd.id + case _ => false + } + } + override def appendTo(buf: StringBuffer) { + buf.append("taskSetForRdd(" + rdd + ")") + } + }) + EasyMock.reportMatcher(matcher) + return null + } + + def expectGetLocations(): Unit = { + EasyMock.expect(blockManagerMaster.getLocations(anyObject().asInstanceOf[Array[String]])). + andAnswer(new IAnswer[Seq[Seq[BlockManagerId]]] { + override def answer(): Seq[Seq[BlockManagerId]] = { + val blocks = getCurrentArguments()(0).asInstanceOf[Array[String]] + return blocks.map { name => + val pieces = name.split("_") + if (pieces(0) == "rdd") { + val key = pieces(1).toInt -> pieces(2).toInt + if (cacheLocations.contains(key)) { + cacheLocations(key) + } else { + Seq[BlockManagerId]() + } + } else { + Seq[BlockManagerId]() + } + }.toSeq + } + }).anyTimes() + } + + def expectStageAnd(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], + preferredLocations: Option[Seq[Seq[String]]] = None)(afterSubmit: TaskSet => Unit) { + // TODO: Remember which submission + EasyMock.expect(taskScheduler.submitTasks(taskSetForRdd(rdd))).andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + val taskSet = getCurrentArguments()(0).asInstanceOf[TaskSet] + for (task <- taskSet.tasks) { + task.generation = mapOutputTracker.getGeneration + } + afterSubmit(taskSet) + preferredLocations match { + case None => + for (taskLocs <- taskSet.tasks.map(_.preferredLocations)) { + w { assert(taskLocs.size === 0) } + } + case Some(locations) => + w { assert(locations.size === taskSet.tasks.size) } + for ((expectLocs, taskLocs) <- + taskSet.tasks.map(_.preferredLocations).zip(locations)) { + w { assert(expectLocs === taskLocs) } + } + } + w { assert(taskSet.tasks.size >= results.size)} + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + scheduler.taskEnded(taskSet.tasks(i), result._1, result._2, Map[Long, Any]()) + } + } + } + }) + } + + def expectStage(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], + preferredLocations: Option[Seq[Seq[String]]] = None) { + expectStageAnd(rdd, results, preferredLocations) { _ => } + } + + def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): Array[Int] = { + return scheduler.runJob[(Int, Int), Int]( + rdd, + (context: TaskContext, it: Iterator[(Int, Int)]) => it.next._1.asInstanceOf[Int], + (0 to (rdd.splits.size - 1)), + "test-site", + allowLocal + ) + } + + def makeMapStatus(host: String, reduces: Int): MapStatus = + new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2)) + + test("zero split job") { + val rdd = makeRdd(0, Nil) + resetExpecting { + expectGetLocations() + // deliberately expect no stages to be submitted + } + whenExecuting { + assert(submitRdd(rdd) === Array[Int]()) + } + } + + test("run trivial job") { + val rdd = makeRdd(1, Nil) + resetExpecting { + expectGetLocations() + expectStage(rdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(rdd) === Array(42)) + } + } + + test("local job") { + val rdd = new MyRDD(sc, Nil) { + override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + Array(42 -> 0).iterator + override def getSplits() = Array( new Split { override def index = 0 } ) + override def getPreferredLocations(split: Split) = Nil + override def toString = "DAGSchedulerSuite Local RDD" + } + resetExpecting { + expectGetLocations() + // deliberately expect no stages to be submitted + } + whenExecuting { + assert(submitRdd(rdd, true) === Array(42)) + } + } + + test("run trivial job w/ dependency") { + val baseRdd = makeRdd(1, Nil) + val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + resetExpecting { + expectGetLocations() + expectStage(finalRdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("location preferences w/ dependency") { + val baseRdd = makeRdd(1, Nil) + val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + resetExpecting { + expectGetLocations() + cacheLocations(baseRdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + expectStage(finalRdd, List( (Success, 42) ), + Some(List(Seq("hostA", "hostB")))) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("trivial job failure") { + val rdd = makeRdd(1, Nil) + resetExpecting { + expectGetLocations() + expectStageAnd(rdd, List()) { taskSet => scheduler.taskSetFailed(taskSet, "test failure") } + } + whenExecuting(taskScheduler, blockManagerMaster) { + intercept[SparkException] { submitRdd(rdd) } + } + } + + test("run trivial shuffle") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(1, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42)) + } + } + + test("run trivial shuffle with fetch failure") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(2, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(reduceRdd, List( + (Success, 42), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null) + )) + // partial recompute + expectStage(shuffleMapRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStageAnd(reduceRdd, List( (Success, 43) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), + makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42, 43)) + } + } + + test("ignore late map task completions") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(2, List(shuffleDep)) + + resetExpecting { + expectGetLocations() + expectStageAnd(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) { taskSet => + val newGeneration = mapOutputTracker.getGeneration + 1 + scheduler.executorLost("exec-hostA") + val noAccum = Map[Long, Any]() + // We rely on the event queue being ordered and increasing the generation number by 1 + // should be ignored for being too old + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) + // should work because it's a non-failed host + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum) + // should be ignored for being too old + scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) + // should be ignored (not end the stage) because it's too old + scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) + taskSet.tasks(1).generation = newGeneration + scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStageAnd(reduceRdd, List( + (Success, 42), (Success, 43) + )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42, 43)) + } + } + + test("run trivial shuffle with out-of-band failure") { + val shuffleMapRdd = makeRdd(2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = makeRdd(1, List(shuffleDep)) + resetExpecting { + expectGetLocations() + blockManagerMaster.removeExecutor("exec-hostA") + expectStageAnd(shuffleMapRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) { _ => scheduler.executorLost("exec-hostA") } + expectStage(shuffleMapRdd, List( + (Success, makeMapStatus("hostC", 1)) + )) + expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => + w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), + makeBlockManagerId("hostB"))) } + } + } + whenExecuting { + assert(submitRdd(reduceRdd) === Array(42)) + } + } + + test("recursive shuffle failures") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )) + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // triggers a partial recompute of the first stage, then the second + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)) + )) + expectStage(finalRdd, List( + (Success, 42) + )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("cached post-shuffle") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )){ _ => + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't + // immediately try to rerun shuffleOneRdd: + expectStage(shuffleTwoRdd, List( + (Success, makeMapStatus("hostD", 1)) + ), Some(Seq(List("hostD")))) + expectStage(finalRdd, List( + (Success, 42) + )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } + + test("cached post-shuffle but fails") { + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + + resetExpecting { + expectGetLocations() + expectStage(shuffleOneRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)) + )) + expectStageAnd(shuffleTwoRdd, List( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)) + )){ _ => + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + } + blockManagerMaster.removeExecutor("exec-hostA") + expectStage(finalRdd, List( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) + )) + // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't + // immediately try to rerun shuffleOneRdd: + expectStageAnd(shuffleTwoRdd, List( + (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) + ), Some(Seq(List("hostD")))) { _ => + w { + intercept[FetchFailedException]{ + mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) + } + } + cacheLocations.remove(shuffleTwoRdd.id -> 0) + } + // after that fetch failure, we should refetch the cache locations and try to recompute + // the whole chain. Note that we will ignore that a fetch failure previously occured on + // this host. + expectStage(shuffleOneRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStage(shuffleTwoRdd, List( (Success, makeMapStatus("hostA", 1)) )) + expectStage(finalRdd, List( (Success, 42) )) + } + whenExecuting { + assert(submitRdd(finalRdd) === Array(42)) + } + } +} + -- cgit v1.2.3 From 4bf3d7ea1252454ca584a3dabf26bdeab4069409 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 29 Jan 2013 19:05:45 -0800 Subject: Clear spark.master.port to cleanup for other tests --- core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) (limited to 'core') diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 53f5214d7a..6c577c2685 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -102,6 +102,7 @@ class DAGSchedulerSuite extends FunSuite if (schedulerException != null) { throw new Exception("Exception caught from scheduler thread", schedulerException) } + System.clearProperty("spark.master.port") } // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. -- cgit v1.2.3 From 178b89204c9dbee36886e757ddaafbd079672f4a Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:19:55 -0800 Subject: Refactor DAGScheduler more to allow testing without a separate thread. --- .../main/scala/spark/scheduler/DAGScheduler.scala | 176 +++++++++++++-------- 1 file changed, 111 insertions(+), 65 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9655961162..6892509ed1 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -23,11 +23,13 @@ import util.{MetadataCleaner, TimeStampedHashMap} * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ private[spark] -class DAGScheduler(taskSched: TaskScheduler, - mapOutputTracker: MapOutputTracker, - blockManagerMaster: BlockManagerMaster, - env: SparkEnv) - extends TaskSchedulerListener with Logging { +class DAGScheduler( + taskSched: TaskScheduler, + mapOutputTracker: MapOutputTracker, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv) + extends TaskSchedulerListener with Logging { + def this(taskSched: TaskScheduler) { this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get) } @@ -203,6 +205,27 @@ class DAGScheduler(taskSched: TaskScheduler, missing.toList } + /** Returns (and does not) submit a JobSubmitted event suitable to run a given job, and + * a JobWaiter whose getResult() method will return the result of the job when it is complete. + * + * The job is assumed to have at least one partition; zero partition jobs should be handled + * without a JobSubmitted event. + */ + private[scheduler] def prepareJob[T, U: ClassManifest]( + finalRdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + callSite: String, + allowLocal: Boolean) + : (JobSubmitted, JobWaiter) = + { + assert(partitions.size > 0) + val waiter = new JobWaiter(partitions.size) + val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] + val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter) + return (toSubmit, waiter) + } + def runJob[T, U: ClassManifest]( finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, @@ -214,9 +237,8 @@ class DAGScheduler(taskSched: TaskScheduler, if (partitions.size == 0) { return new Array[U](0) } - val waiter = new JobWaiter(partitions.size) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)) + val (toSubmit, waiter) = prepareJob(finalRdd, func, partitions, callSite, allowLocal) + eventQueue.put(toSubmit) waiter.getResult() match { case JobSucceeded(results: Seq[_]) => return results.asInstanceOf[Seq[U]].toArray @@ -241,6 +263,81 @@ class DAGScheduler(taskSched: TaskScheduler, return listener.getResult() // Will throw an exception if the job fails } + /** Process one event retrieved from the event queue. + * Returns true if we should stop the event loop. + */ + private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { + event match { + case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => + val runId = nextRunId.getAndIncrement() + val finalStage = newStage(finalRDD, None, runId) + val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) + clearCacheLocs() + logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + " output partitions (allowLocal=" + allowLocal + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + // Compute very short actions like first() or take() with no parent stages locally. + runLocally(job) + } else { + activeJobs += job + resultStageToJob(finalStage) = job + submitStage(finalStage) + } + + case ExecutorLost(execId) => + handleExecutorLost(execId) + + case completion: CompletionEvent => + handleTaskCompletion(completion) + + case TaskSetFailed(taskSet, reason) => + abortStage(idToStage(taskSet.stageId), reason) + + case StopDAGScheduler => + // Cancel any active jobs + for (job <- activeJobs) { + val error = new SparkException("Job cancelled because SparkContext was shut down") + job.listener.jobFailed(error) + } + return true + } + return false + } + + /** Resubmit any failed stages. Ordinarily called after a small amount of time has passed since + * the last fetch failure. + */ + private[scheduler] def resubmitFailedStages() { + logInfo("Resubmitting failed stages") + clearCacheLocs() + val failed2 = failed.toArray + failed.clear() + for (stage <- failed2.sortBy(_.priority)) { + submitStage(stage) + } + } + + /** Check for waiting or failed stages which are now eligible for resubmission. + * Ordinarily run on every iteration of the event loop. + */ + private[scheduler] def submitWaitingStages() { + // TODO: We might want to run this less often, when we are sure that something has become + // runnable that wasn't before. + logTrace("Checking for newly runnable parent stages") + logTrace("running: " + running) + logTrace("waiting: " + waiting) + logTrace("failed: " + failed) + val waiting2 = waiting.toArray + waiting.clear() + for (stage <- waiting2.sortBy(_.priority)) { + submitStage(stage) + } + } + + /** * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure * events and responds by launching tasks. This runs in a dedicated thread and receives events @@ -251,77 +348,26 @@ class DAGScheduler(taskSched: TaskScheduler, while (true) { val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS) - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability if (event != null) { logDebug("Got event of type " + event.getClass.getName) } - event match { - case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => - val runId = nextRunId.getAndIncrement() - val finalStage = newStage(finalRDD, None, runId) - val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) - clearCacheLocs() - logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + - " output partitions") - logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { - // Compute very short actions like first() or take() with no parent stages locally. - runLocally(job) - } else { - activeJobs += job - resultStageToJob(finalStage) = job - submitStage(finalStage) - } - - case ExecutorLost(execId) => - handleExecutorLost(execId) - - case completion: CompletionEvent => - handleTaskCompletion(completion) - - case TaskSetFailed(taskSet, reason) => - abortStage(idToStage(taskSet.stageId), reason) - - case StopDAGScheduler => - // Cancel any active jobs - for (job <- activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") - job.listener.jobFailed(error) - } + if (event != null) { + if (processEvent(event)) { return - - case null => - // queue.poll() timed out, ignore it + } } + val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability // Periodically resubmit failed stages if some map output fetches have failed and we have // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at // the same time, so we want to make sure we've identified all the reduce tasks that depend // on the failed node. if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - logInfo("Resubmitting failed stages") - clearCacheLocs() - val failed2 = failed.toArray - failed.clear() - for (stage <- failed2.sortBy(_.priority)) { - submitStage(stage) - } + resubmitFailedStages } else { - // TODO: We might want to run this less often, when we are sure that something has become - // runnable that wasn't before. - logTrace("Checking for newly runnable parent stages") - logTrace("running: " + running) - logTrace("waiting: " + waiting) - logTrace("failed: " + failed) - val waiting2 = waiting.toArray - waiting.clear() - for (stage <- waiting2.sortBy(_.priority)) { - submitStage(stage) - } + submitWaitingStages } } } -- cgit v1.2.3 From 9c0bae75ade9e5b5a69077a5719adf4ee96e2c2e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:22:07 -0800 Subject: Change DAGSchedulerSuite to run DAGScheduler in the same Thread. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 568 ++++++++++++--------- 1 file changed, 319 insertions(+), 249 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 6c577c2685..89173540d4 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -4,12 +4,12 @@ import scala.collection.mutable.{Map, HashMap} import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.AsyncAssertions import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.mock.EasyMockSugar import org.scalatest.time.{Span, Seconds} import org.easymock.EasyMock._ +import org.easymock.Capture import org.easymock.EasyMock import org.easymock.{IAnswer, IArgumentMatcher} @@ -30,33 +30,55 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} -class DAGSchedulerSuite extends FunSuite - with BeforeAndAfter with EasyMockSugar with TimeLimitedTests - with AsyncAssertions with spark.Logging { +class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar with TimeLimitedTests { - // If we crash the DAGScheduler thread, our test will probably hang. + // impose a time limit on this test in case we don't let the job finish. override val timeLimit = Span(5, Seconds) val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") var scheduler: DAGScheduler = null - var w: Waiter = null val taskScheduler = mock[TaskScheduler] val blockManagerMaster = mock[BlockManagerMaster] var mapOutputTracker: MapOutputTracker = null var schedulerThread: Thread = null var schedulerException: Throwable = null + + /** Set of EasyMock argument matchers that match a TaskSet for a given RDD. + * We cache these so we do not create duplicate matchers for the same RDD. + * This allows us to easily setup a sequence of expectations for task sets for + * that RDD. + */ val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] + + /** Set of cache locations to return from our mock BlockManagerMaster. + * Keys are (rdd ID, partition ID). Anything not present will return an empty + * list of cache locations silently. + */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] - implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which + * will only submit one job) from needing to explicitly track it. + */ + var lastJobWaiter: JobWaiter = null - def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) + /** Tell EasyMockSugar what mock objects we want to be configured by expecting {...} + * and whenExecuting {...} */ + implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) + /** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects + * to be reset after each time their expectations are set, and we tend to check mock object + * calls over a single call to DAGScheduler. + * + * We also set a default expectation here that blockManagerMaster.getLocations can be called + * and will return values from cacheLocations. + */ def resetExpecting(f: => Unit) { reset(taskScheduler) reset(blockManagerMaster) - expecting(f) + expecting { + expectGetLocations() + f + } } before { @@ -70,45 +92,30 @@ class DAGSchedulerSuite extends FunSuite whenExecuting { scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) } - w = new Waiter - schedulerException = null - schedulerThread = new Thread("DAGScheduler under test") { - override def run() { - try { - scheduler.run() - } catch { - case t: Throwable => - logError("Got exception in DAGScheduler: ", t) - schedulerException = t - } finally { - w.dismiss() - } - } - } - schedulerThread.start - logInfo("finished before") } after { - logInfo("started after") + assert(scheduler.processEvent(StopDAGScheduler)) resetExpecting { taskScheduler.stop() } whenExecuting { - scheduler.stop - schedulerThread.join - } - w.await() - if (schedulerException != null) { - throw new Exception("Exception caught from scheduler thread", schedulerException) + scheduler.stop() } System.clearProperty("spark.master.port") } - // Type of RDD we use for testing. Note that we should never call the real RDD compute methods. - // This is a pair RDD type so it can always be used in ShuffleDependencies. + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) + + /** Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + * This is a pair RDD type so it can always be used in ShuffleDependencies. */ type MyRDD = RDD[(Int, Int)] + /** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and + * preferredLocations (if any) that are passed to them. They are deliberately not executable + * so we can test that DAGScheduler does not try to execute RDDs locally. + */ def makeRdd( numSplits: Int, dependencies: List[Dependency[_]], @@ -130,6 +137,9 @@ class DAGSchedulerSuite extends FunSuite } } + /** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task + * is from a particular RDD. + */ def taskSetForRdd(rdd: MyRDD): TaskSet = { val matcher = taskSetMatchers.getOrElseUpdate(rdd, new IArgumentMatcher { @@ -149,6 +159,9 @@ class DAGSchedulerSuite extends FunSuite return null } + /** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from + * cacheLocations. + */ def expectGetLocations(): Unit = { EasyMock.expect(blockManagerMaster.getLocations(anyObject().asInstanceOf[Array[String]])). andAnswer(new IAnswer[Seq[Seq[BlockManagerId]]] { @@ -171,51 +184,106 @@ class DAGSchedulerSuite extends FunSuite }).anyTimes() } - def expectStageAnd(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None)(afterSubmit: TaskSet => Unit) { - // TODO: Remember which submission - EasyMock.expect(taskScheduler.submitTasks(taskSetForRdd(rdd))).andAnswer(new IAnswer[Unit] { - override def answer(): Unit = { - val taskSet = getCurrentArguments()(0).asInstanceOf[TaskSet] - for (task <- taskSet.tasks) { - task.generation = mapOutputTracker.getGeneration - } - afterSubmit(taskSet) - preferredLocations match { - case None => - for (taskLocs <- taskSet.tasks.map(_.preferredLocations)) { - w { assert(taskLocs.size === 0) } - } - case Some(locations) => - w { assert(locations.size === taskSet.tasks.size) } - for ((expectLocs, taskLocs) <- - taskSet.tasks.map(_.preferredLocations).zip(locations)) { - w { assert(expectLocs === taskLocs) } - } - } - w { assert(taskSet.tasks.size >= results.size)} - for ((result, i) <- results.zipWithIndex) { - if (i < taskSet.tasks.size) { - scheduler.taskEnded(taskSet.tasks(i), result._1, result._2, Map[Long, Any]()) - } - } + /** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting + * the scheduler not to exit. + * + * After processing the event, submit waiting stages as is done on most iterations of the + * DAGScheduler event loop. + */ + def runEvent(event: DAGSchedulerEvent) { + assert(!scheduler.processEvent(event)) + scheduler.submitWaitingStages() + } + + /** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be + * called from a resetExpecting { ... } block. + * + * Returns a easymock Capture that will contain the task set after the stage is submitted. + * Most tests should use interceptStage() instead of this directly. + */ + def expectStage(rdd: MyRDD): Capture[TaskSet] = { + val taskSetCapture = new Capture[TaskSet] + taskScheduler.submitTasks(and(capture(taskSetCapture), taskSetForRdd(rdd))) + return taskSetCapture + } + + /** Expect the supplied code snippet to submit a stage for the specified RDD. + * Return the resulting TaskSet. First marks all the tasks are belonging to the + * current MapOutputTracker generation. + */ + def interceptStage(rdd: MyRDD)(f: => Unit): TaskSet = { + var capture: Capture[TaskSet] = null + resetExpecting { + capture = expectStage(rdd) + } + whenExecuting { + f + } + val taskSet = capture.getValue + for (task <- taskSet.tasks) { + task.generation = mapOutputTracker.getGeneration + } + return taskSet + } + + /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ + def respondToTaskSet(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any]())) } - }) + } } - def expectStage(rdd: MyRDD, results: Seq[(TaskEndReason, Any)], - preferredLocations: Option[Seq[Seq[String]]] = None) { - expectStageAnd(rdd, results, preferredLocations) { _ => } + /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + def expectTaskSetLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { + assert(locations.size === taskSet.tasks.size) + for ((expectLocs, taskLocs) <- + taskSet.tasks.map(_.preferredLocations).zip(locations)) { + assert(expectLocs === taskLocs) + } } - def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): Array[Int] = { - return scheduler.runJob[(Int, Int), Int]( + /** When we submit dummy Jobs, this is the compute function we supply. Except in a local test + * below, we do not expect this function to ever be executed; instead, we will return results + * directly through CompletionEvents. + */ + def jobComputeFunc(context: TaskContext, it: Iterator[(Int, Int)]): Int = + it.next._1.asInstanceOf[Int] + + + /** Start a job to compute the given RDD. Returns the JobWaiter that will + * collect the result of the job via callbacks from DAGScheduler. */ + def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): JobWaiter = { + val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, - (context: TaskContext, it: Iterator[(Int, Int)]) => it.next._1.asInstanceOf[Int], + jobComputeFunc, (0 to (rdd.splits.size - 1)), "test-site", allowLocal ) + lastJobWaiter = waiter + runEvent(toSubmit) + return waiter + } + + /** Assert that a job we started has failed. */ + def expectJobException(waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(_) => fail() + case JobFailed(_) => return + } + } + + /** Assert that a job we started has succeeded and has the given result. */ + def expectJobResult(expected: Array[Int], waiter: JobWaiter = lastJobWaiter) { + waiter.getResult match { + case JobSucceeded(answer) => + assert(expected === answer.asInstanceOf[Seq[Int]].toArray ) + case JobFailed(_) => + fail() + } } def makeMapStatus(host: String, reduces: Int): MapStatus = @@ -223,24 +291,14 @@ class DAGSchedulerSuite extends FunSuite test("zero split job") { val rdd = makeRdd(0, Nil) - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd) === Array[Int]()) - } + assert(scheduler.runJob(rdd, jobComputeFunc, Seq(), "test-site", false) === Array[Int]()) } test("run trivial job") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStage(rdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(rdd) === Array(42)) - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("local job") { @@ -251,51 +309,34 @@ class DAGSchedulerSuite extends FunSuite override def getPreferredLocations(split: Split) = Nil override def toString = "DAGSchedulerSuite Local RDD" } - resetExpecting { - expectGetLocations() - // deliberately expect no stages to be submitted - } - whenExecuting { - assert(submitRdd(rdd, true) === Array(42)) - } + submitRdd(rdd, true) + expectJobResult(Array(42)) } test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - expectStage(finalRdd, List( (Success, 42) )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } - test("location preferences w/ dependency") { + test("cache location preferences w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) - resetExpecting { - expectGetLocations() - cacheLocations(baseRdd.id -> 0) = - Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) - expectStage(finalRdd, List( (Success, 42) ), - Some(List(Seq("hostA", "hostB")))) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + cacheLocations(baseRdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + val taskSet = interceptStage(finalRdd) { submitRdd(finalRdd) } + expectTaskSetLocations(taskSet, List(Seq("hostA", "hostB"))) + respondToTaskSet(taskSet, List( (Success, 42) )) + expectJobResult(Array(42)) } test("trivial job failure") { val rdd = makeRdd(1, Nil) - resetExpecting { - expectGetLocations() - expectStageAnd(rdd, List()) { taskSet => scheduler.taskSetFailed(taskSet, "test failure") } - } - whenExecuting(taskScheduler, blockManagerMaster) { - intercept[SparkException] { submitRdd(rdd) } - } + val taskSet = interceptStage(rdd) { submitRdd(rdd) } + runEvent(TaskSetFailed(taskSet, "test failure")) + expectJobException() } test("run trivial shuffle") { @@ -304,20 +345,17 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } - } - } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(secondStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("run trivial shuffle with fetch failure") { @@ -326,28 +364,32 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) - resetExpecting { - expectGetLocations() - expectStage(shuffleMapRdd, List( + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val secondStage = interceptStage(reduceRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(reduceRdd, List( + } + whenExecuting { + respondToTaskSet(secondStage, List( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null) )) - // partial recompute - expectStage(shuffleMapRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 43) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostA"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) + val thirdStage = interceptStage(shuffleMapRdd) { + scheduler.resubmitFailedStages() + } + val fourthStage = interceptStage(reduceRdd) { + respondToTaskSet(thirdStage, List( (Success, makeMapStatus("hostA", 1)) )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + respondToTaskSet(fourthStage, List( (Success, 43) )) + expectJobResult(Array(42, 43)) } test("ignore late map task completions") { @@ -356,63 +398,64 @@ class DAGSchedulerSuite extends FunSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) + val taskSet = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } + val oldGeneration = mapOutputTracker.getGeneration resetExpecting { - expectGetLocations() - expectStageAnd(shuffleMapRdd, List( - (Success, makeMapStatus("hostA", 1)) - )) { taskSet => - val newGeneration = mapOutputTracker.getGeneration + 1 - scheduler.executorLost("exec-hostA") - val noAccum = Map[Long, Any]() - // We rely on the event queue being ordered and increasing the generation number by 1 - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should work because it's a non-failed host - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum) - // should be ignored for being too old - scheduler.taskEnded(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum) - // should be ignored (not end the stage) because it's too old - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - taskSet.tasks(1).generation = newGeneration - scheduler.taskEnded(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum) - } blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(reduceRdd, List( - (Success, 42), (Success, 43) - )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) } - } } whenExecuting { - assert(submitRdd(reduceRdd) === Array(42, 43)) - } + runEvent(ExecutorLost("exec-hostA")) + } + val newGeneration = mapOutputTracker.getGeneration + assert(newGeneration > oldGeneration) + val noAccum = Map[Long, Any]() + // We rely on the event queue being ordered and increasing the generation number by 1 + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + // should work because it's a non-failed host + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum)) + // should be ignored for being too old + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum)) + taskSet.tasks(1).generation = newGeneration + val secondStage = interceptStage(reduceRdd) { + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum)) + } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + respondToTaskSet(secondStage, List( (Success, 42), (Success, 43) )) + expectJobResult(Array(42, 43)) } - test("run trivial shuffle with out-of-band failure") { + test("run trivial shuffle with out-of-band failure and retry") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) + + val firstStage = interceptStage(shuffleMapRdd) { submitRdd(reduceRdd) } resetExpecting { - expectGetLocations() blockManagerMaster.removeExecutor("exec-hostA") - expectStageAnd(shuffleMapRdd, List( + } + whenExecuting { + runEvent(ExecutorLost("exec-hostA")) + } + // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks + // rather than marking it is as failed and waiting. + val secondStage = interceptStage(shuffleMapRdd) { + respondToTaskSet(firstStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) - )) { _ => scheduler.executorLost("exec-hostA") } - expectStage(shuffleMapRdd, List( - (Success, makeMapStatus("hostC", 1)) )) - expectStageAnd(reduceRdd, List( (Success, 42) )) { _ => - w { assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === - Array(makeBlockManagerId("hostC"), - makeBlockManagerId("hostB"))) } - } } - whenExecuting { - assert(submitRdd(reduceRdd) === Array(42)) + val thirdStage = interceptStage(reduceRdd) { + respondToTaskSet(secondStage, List( + (Success, makeMapStatus("hostC", 1)) + )) } + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + respondToTaskSet(thirdStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("recursive shuffle failures") { @@ -422,34 +465,42 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)) + val firstStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + val secondStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val thirdStage = interceptStage(finalRdd) { + respondToTaskSet(secondStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostC", 1)) )) + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(thirdStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // triggers a partial recompute of the first stage, then the second - expectStage(shuffleOneRdd, List( - (Success, makeMapStatus("hostA", 1)) + } + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages + } + val recomputeTwo = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( + (Success, makeMapStatus("hostA", 2)) )) - expectStage(shuffleTwoRdd, List( + } + val finalStage = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( - (Success, 42) - )) - } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) } + respondToTaskSet(finalStage, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle") { @@ -459,35 +510,41 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStage(shuffleTwoRdd, List( + } + // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. + val recomputeTwo = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwo, Seq(Seq("hostD"))) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwo, List( (Success, makeMapStatus("hostD", 1)) - ), Some(Seq(List("hostD")))) - expectStage(finalRdd, List( - (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) - } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } test("cached post-shuffle but fails") { @@ -497,45 +554,58 @@ class DAGSchedulerSuite extends FunSuite val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = makeRdd(1, List(shuffleDepTwo)) - resetExpecting { - expectGetLocations() - expectStage(shuffleOneRdd, List( + val firstShuffleStage = interceptStage(shuffleOneRdd) { submitRdd(finalRdd) } + cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) + cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) + val secondShuffleStage = interceptStage(shuffleTwoRdd) { + respondToTaskSet(firstShuffleStage, List( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)) + )) + } + val reduceStage = interceptStage(finalRdd) { + respondToTaskSet(secondShuffleStage, List( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)) )) - expectStageAnd(shuffleTwoRdd, List( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)) - )){ _ => - cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) - cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - } + } + resetExpecting { blockManagerMaster.removeExecutor("exec-hostA") - expectStage(finalRdd, List( + } + whenExecuting { + respondToTaskSet(reduceStage, List( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null) )) - // since we have a cached copy of the missing split of shuffleTwoRdd, we shouldn't - // immediately try to rerun shuffleOneRdd: - expectStageAnd(shuffleTwoRdd, List( - (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) - ), Some(Seq(List("hostD")))) { _ => - w { - intercept[FetchFailedException]{ - mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) - } - } - cacheLocations.remove(shuffleTwoRdd.id -> 0) - } - // after that fetch failure, we should refetch the cache locations and try to recompute - // the whole chain. Note that we will ignore that a fetch failure previously occured on - // this host. - expectStage(shuffleOneRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(shuffleTwoRdd, List( (Success, makeMapStatus("hostA", 1)) )) - expectStage(finalRdd, List( (Success, 42) )) } - whenExecuting { - assert(submitRdd(finalRdd) === Array(42)) + val recomputeTwoCached = interceptStage(shuffleTwoRdd) { + scheduler.resubmitFailedStages() + } + expectTaskSetLocations(recomputeTwoCached, Seq(Seq("hostD"))) + intercept[FetchFailedException]{ + mapOutputTracker.getServerStatuses(shuffleDepOne.shuffleId, 0) + } + + // Simulate the shuffle input data failing to be cached. + cacheLocations.remove(shuffleTwoRdd.id -> 0) + respondToTaskSet(recomputeTwoCached, List( + (FetchFailed(null, shuffleDepOne.shuffleId, 0, 0), null) + )) + + // After the fetch failure, DAGScheduler should recheck the cache and decide to resubmit + // everything. + val recomputeOne = interceptStage(shuffleOneRdd) { + scheduler.resubmitFailedStages() } + // We use hostA here to make sure DAGScheduler doesn't think it's still dead. + val recomputeTwoUncached = interceptStage(shuffleTwoRdd) { + respondToTaskSet(recomputeOne, List( (Success, makeMapStatus("hostA", 1)) )) + } + expectTaskSetLocations(recomputeTwoUncached, Seq(Seq[String]())) + val finalRetry = interceptStage(finalRdd) { + respondToTaskSet(recomputeTwoUncached, List( (Success, makeMapStatus("hostA", 1)) )) + + } + respondToTaskSet(finalRetry, List( (Success, 42) )) + expectJobResult(Array(42)) } } - -- cgit v1.2.3 From 7f51458774ce4561f1df3ba9b68704c3f63852f3 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 30 Jan 2013 09:34:53 -0800 Subject: Comment at top of DAGSchedulerSuite --- .../test/scala/spark/scheduler/DAGSchedulerSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 89173540d4..c31e2e7064 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -30,9 +30,22 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} +/** + * Tests for DAGScheduler. These tests directly call the event processing functinos in DAGScheduler + * rather than spawning an event loop thread as happens in the real code. They use EasyMock + * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are + * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead + * host notifications are sent). In addition, tests may check for side effects on a non-mocked + * MapOutputTracker instance. + * + * Tests primarily consist of running DAGScheduler#processEvent and + * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet) + * and capturing the resulting TaskSets from the mock TaskScheduler. + */ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar with TimeLimitedTests { - // impose a time limit on this test in case we don't let the job finish. + // impose a time limit on this test in case we don't let the job finish, in which case + // JobWaiter#getResult will hang. override val timeLimit = Span(5, Seconds) val sc: SparkContext = new SparkContext("local", "DAGSchedulerSuite") -- cgit v1.2.3 From c1df24d0850b0ac89f35f1a47ce6b2fb5b95df0a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 30 Jan 2013 18:51:14 -0800 Subject: rename Slaves --> Executor --- core/src/main/scala/spark/SparkContext.scala | 6 +++--- core/src/main/scala/spark/storage/BlockManagerUI.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index a09eca1dd0..39e3555de8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -468,7 +468,7 @@ class SparkContext( * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. */ - def getSlavesMemoryStatus: Map[String, (Long, Long)] = { + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => (blockManagerId.ip + ":" + blockManagerId.port, mem) } @@ -479,13 +479,13 @@ class SparkContext( * they take, etc. */ def getRDDStorageInfo : Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this) + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } /** * Return information about blocks stored in all of the slaves */ - def getSlavesStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus : Array[StorageStatus] = { env.blockManager.master.getStorageStatus } diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 52f6d1b657..9e6721ec17 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getSlavesStorageStatus + val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getSlavesStorageStatus + val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head -- cgit v1.2.3 From 782187c21047ee31728bdb173a2b7ee708cef77b Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Thu, 31 Jan 2013 18:27:25 -0600 Subject: Once we find a split with no block, we don't have to look for more. --- .../main/scala/spark/scheduler/DAGScheduler.scala | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b130be6a38..b62b25f688 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -177,18 +177,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!visited(rdd)) { visited += rdd val locs = getCacheLocs(rdd) - for (p <- 0 until rdd.splits.size) { - if (locs(p) == Nil) { - for (dep <- rdd.dependencies) { - dep match { - case shufDep: ShuffleDependency[_,_] => - val mapStage = getShuffleMapStage(shufDep, stage.priority) - if (!mapStage.isAvailable) { - missing += mapStage - } - case narrowDep: NarrowDependency[_] => - visit(narrowDep.rdd) - } + val atLeastOneMissing = (0 until rdd.splits.size).exists(locs(_) == Nil) + if (atLeastOneMissing) { + for (dep <- rdd.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_] => + val mapStage = getShuffleMapStage(shufDep, stage.priority) + if (!mapStage.isAvailable) { + missing += mapStage + } + case narrowDep: NarrowDependency[_] => + visit(narrowDep.rdd) } } } -- cgit v1.2.3 From f127f2ae76692b189d86b5a47293579d5657c6d5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 00:20:49 -0800 Subject: fixup merge (master -> driver renaming) --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 99324445ca..0372cb080a 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -118,7 +118,7 @@ private[spark] class BlockManagerMaster( } def getStorageStatus: Array[StorageStatus] = { - askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray } /** Stop the driver actor, called only on the Spark driver node */ -- cgit v1.2.3 From 8a0a5ed53353ad6aa5656eb729d55ca7af2ab096 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 00:23:38 -0800 Subject: track total partitions, in addition to cached partitions; use scala string formatting --- core/src/main/scala/spark/storage/StorageUtils.scala | 10 ++++------ core/src/main/twirl/spark/storage/rdd.scala.html | 6 +++++- core/src/main/twirl/spark/storage/rdd_table.scala.html | 6 ++++-- 3 files changed, 13 insertions(+), 9 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index ce7c067eea..5367b74bb6 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -22,12 +22,11 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) { + numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) { override def toString = { import Utils.memoryBytesToString - import java.lang.{Integer => JInt} - String.format("RDD \"%s\" (%d) Storage: %s; Partitions: %d; MemorySize: %s; DiskSize: %s", name, id.asInstanceOf[JInt], - storageLevel.toString, numPartitions.asInstanceOf[JInt], memoryBytesToString(memSize), memoryBytesToString(diskSize)) + "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id, + storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize)) } } @@ -65,9 +64,8 @@ object StorageUtils { val rdd = sc.persistentRdds(rddId) val rddName = Option(rdd.name).getOrElse(rddKey) val rddStorageLevel = rdd.getStorageLevel - //TODO get total number of partitions in rdd - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize) }.toArray } diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html index ac7f8c981f..d85addeb17 100644 --- a/core/src/main/twirl/spark/storage/rdd.scala.html +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -11,7 +11,11 @@ Storage Level: @(rddInfo.storageLevel.description)
  • - Partitions: + Cached Partitions: + @(rddInfo.numCachedPartitions) +
  • +
  • + Total Partitions: @(rddInfo.numPartitions)
  • diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html index af801cf229..a51e64aed0 100644 --- a/core/src/main/twirl/spark/storage/rdd_table.scala.html +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -6,7 +6,8 @@ RDD Name Storage Level - Partitions + Cached Partitions + Fraction Partitions Cached Size in Memory Size on Disk @@ -21,7 +22,8 @@ @(rdd.storageLevel.description) - @rdd.numPartitions + @rdd.numCachedPartitions + @(rdd.numCachedPartitions / rdd.numPartitions.toDouble) @{Utils.memoryBytesToString(rdd.memSize)} @{Utils.memoryBytesToString(rdd.diskSize)} -- cgit v1.2.3 From c6190067ae40cf457b7f2e58619904b6fd2b1cb6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 09:55:25 -0800 Subject: remove unneeded (and unused) filter on block info --- core/src/main/scala/spark/storage/StorageUtils.scala | 2 -- 1 file changed, 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 5367b74bb6..5f72b67b2c 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -43,8 +43,6 @@ object StorageUtils { /* Given a list of BlockStatus objets, returns information for each RDD */ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { - // Find all RDD Blocks (ignore broadcast variables) - val rddBlocks = infos.filterKeys(_.startsWith("rdd")) // Group by rddId, ignore the partition name val groupedRddBlocks = infos.groupBy { case(k, v) => -- cgit v1.2.3 From 1fd5ee323d127499bb3f173d4142c37532ec29b2 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Fri, 1 Feb 2013 22:33:38 -0800 Subject: Code review changes: add sc.stop; style of multiline comments; parens on procedure calls. --- .../scala/spark/scheduler/DAGSchedulerSuite.scala | 69 +++++++++++++++------- 1 file changed, 47 insertions(+), 22 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index c31e2e7064..adce1f38bb 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -31,7 +31,7 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} /** - * Tests for DAGScheduler. These tests directly call the event processing functinos in DAGScheduler + * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler * rather than spawning an event loop thread as happens in the real code. They use EasyMock * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead @@ -56,29 +56,34 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar var schedulerThread: Thread = null var schedulerException: Throwable = null - /** Set of EasyMock argument matchers that match a TaskSet for a given RDD. + /** + * Set of EasyMock argument matchers that match a TaskSet for a given RDD. * We cache these so we do not create duplicate matchers for the same RDD. * This allows us to easily setup a sequence of expectations for task sets for * that RDD. */ val taskSetMatchers = new HashMap[MyRDD, IArgumentMatcher] - /** Set of cache locations to return from our mock BlockManagerMaster. + /** + * Set of cache locations to return from our mock BlockManagerMaster. * Keys are (rdd ID, partition ID). Anything not present will return an empty * list of cache locations silently. */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] - /** JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which + /** + * JobWaiter for the last JobSubmitted event we pushed. To keep tests (most of which * will only submit one job) from needing to explicitly track it. */ var lastJobWaiter: JobWaiter = null - /** Tell EasyMockSugar what mock objects we want to be configured by expecting {...} + /** + * Tell EasyMockSugar what mock objects we want to be configured by expecting {...} * and whenExecuting {...} */ implicit val mocks = MockObjects(taskScheduler, blockManagerMaster) - /** Utility function to reset mocks and set expectations on them. EasyMock wants mock objects + /** + * Utility function to reset mocks and set expectations on them. EasyMock wants mock objects * to be reset after each time their expectations are set, and we tend to check mock object * calls over a single call to DAGScheduler. * @@ -115,17 +120,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar whenExecuting { scheduler.stop() } + sc.stop() System.clearProperty("spark.master.port") } def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) - /** Type of RDD we use for testing. Note that we should never call the real RDD compute methods. - * This is a pair RDD type so it can always be used in ShuffleDependencies. */ + /** + * Type of RDD we use for testing. Note that we should never call the real RDD compute methods. + * This is a pair RDD type so it can always be used in ShuffleDependencies. + */ type MyRDD = RDD[(Int, Int)] - /** Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and + /** + * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and * preferredLocations (if any) that are passed to them. They are deliberately not executable * so we can test that DAGScheduler does not try to execute RDDs locally. */ @@ -150,7 +159,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task + /** + * EasyMock matcher method. For use as an argument matcher for a TaskSet whose first task * is from a particular RDD. */ def taskSetForRdd(rdd: MyRDD): TaskSet = { @@ -172,7 +182,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return null } - /** Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from + /** + * Setup an EasyMock expectation to repsond to blockManagerMaster.getLocations() called from * cacheLocations. */ def expectGetLocations(): Unit = { @@ -197,7 +208,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar }).anyTimes() } - /** Process the supplied event as if it were the top of the DAGScheduler event queue, expecting + /** + * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting * the scheduler not to exit. * * After processing the event, submit waiting stages as is done on most iterations of the @@ -208,7 +220,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar scheduler.submitWaitingStages() } - /** Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be + /** + * Expect a TaskSet for the specified RDD to be submitted to the TaskScheduler. Should be * called from a resetExpecting { ... } block. * * Returns a easymock Capture that will contain the task set after the stage is submitted. @@ -220,7 +233,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return taskSetCapture } - /** Expect the supplied code snippet to submit a stage for the specified RDD. + /** + * Expect the supplied code snippet to submit a stage for the specified RDD. * Return the resulting TaskSet. First marks all the tasks are belonging to the * current MapOutputTracker generation. */ @@ -239,7 +253,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return taskSet } - /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ + /** + * Send the given CompletionEvent messages for the tasks in the TaskSet. + */ def respondToTaskSet(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { @@ -249,7 +265,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** Assert that the supplied TaskSet has exactly the given preferredLocations. */ + /** + * Assert that the supplied TaskSet has exactly the given preferredLocations. + */ def expectTaskSetLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) { assert(locations.size === taskSet.tasks.size) for ((expectLocs, taskLocs) <- @@ -258,7 +276,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** When we submit dummy Jobs, this is the compute function we supply. Except in a local test + /** + * When we submit dummy Jobs, this is the compute function we supply. Except in a local test * below, we do not expect this function to ever be executed; instead, we will return results * directly through CompletionEvents. */ @@ -266,8 +285,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar it.next._1.asInstanceOf[Int] - /** Start a job to compute the given RDD. Returns the JobWaiter that will - * collect the result of the job via callbacks from DAGScheduler. */ + /** + * Start a job to compute the given RDD. Returns the JobWaiter that will + * collect the result of the job via callbacks from DAGScheduler. + */ def submitRdd(rdd: MyRDD, allowLocal: Boolean = false): JobWaiter = { val (toSubmit, waiter) = scheduler.prepareJob[(Int, Int), Int]( rdd, @@ -281,7 +302,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar return waiter } - /** Assert that a job we started has failed. */ + /** + * Assert that a job we started has failed. + */ def expectJobException(waiter: JobWaiter = lastJobWaiter) { waiter.getResult match { case JobSucceeded(_) => fail() @@ -289,7 +312,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - /** Assert that a job we started has succeeded and has the given result. */ + /** + * Assert that a job we started has succeeded and has the given result. + */ def expectJobResult(expected: Array[Int], waiter: JobWaiter = lastJobWaiter) { waiter.getResult match { case JobSucceeded(answer) => @@ -500,7 +525,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar )) } val recomputeOne = interceptStage(shuffleOneRdd) { - scheduler.resubmitFailedStages + scheduler.resubmitFailedStages() } val recomputeTwo = interceptStage(shuffleTwoRdd) { respondToTaskSet(recomputeOne, List( -- cgit v1.2.3 From 28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 2 Feb 2013 01:11:37 -0600 Subject: Fix createActorSystem not actually using the systemName parameter. This meant all system names were "spark", which worked, but didn't lead to the most intuitive log output. This fixes createActorSystem to use the passed system name, and refactors Master/Worker to encapsulate their system/actor names instead of having the clients guess at them. Note that the driver system name, "spark", is left as is, and is still repeated a few times, but that seems like a separate issue. --- .../scala/spark/deploy/LocalSparkCluster.scala | 38 +++++--------- .../main/scala/spark/deploy/client/Client.scala | 13 ++--- .../main/scala/spark/deploy/master/Master.scala | 24 +++++++-- .../main/scala/spark/deploy/worker/Worker.scala | 58 ++++++++++------------ .../scala/spark/storage/BlockManagerMaster.scala | 2 - core/src/main/scala/spark/util/AkkaUtils.scala | 6 ++- 6 files changed, 68 insertions(+), 73 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 2836574ecb..22319a96ca 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer private[spark] class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { - val localIpAddress = Utils.localIpAddress + private val localIpAddress = Utils.localIpAddress + private val masterActorSystems = ArrayBuffer[ActorSystem]() + private val workerActorSystems = ArrayBuffer[ActorSystem]() - var masterActor : ActorRef = _ - var masterActorSystem : ActorSystem = _ - var masterPort : Int = _ - var masterUrl : String = _ - - val workerActorSystems = ArrayBuffer[ActorSystem]() - val workerActors = ArrayBuffer[ActorRef]() - - def start() : String = { + def start(): String = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) - masterActorSystem = actorSystem - masterUrl = "spark://" + localIpAddress + ":" + masterPort - masterActor = masterActorSystem.actorOf( - Props(new Master(localIpAddress, masterPort, 0)), name = "Master") + val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0) + masterActorSystems += masterSystem + val masterUrl = "spark://" + localIpAddress + ":" + masterPort - /* Start the Slaves */ + /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0) - workerActorSystems += actorSystem - val actor = actorSystem.actorOf( - Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)), - name = "Worker") - workerActors += actor + val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker, + memoryPerWorker, masterUrl, null, Some(workerNum)) + workerActorSystems += workerSystem } return masterUrl @@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I // Stop the workers before the master so they don't get upset that it disconnected workerActorSystems.foreach(_.shutdown()) workerActorSystems.foreach(_.awaitTermination()) - masterActorSystem.shutdown() - masterActorSystem.awaitTermination() + masterActorSystems.foreach(_.shutdown()) + masterActorSystems.foreach(_.awaitTermination()) } } diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 90fe9508cd..a63eee1233 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -9,6 +9,7 @@ import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown import spark.deploy.RegisterJob +import spark.deploy.master.Master import akka.remote.RemoteClientDisconnected import akka.actor.Terminated import akka.dispatch.Await @@ -24,26 +25,18 @@ private[spark] class Client( listener: ClientListener) extends Logging { - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r - var actor: ActorRef = null var jobId: String = null - if (MASTER_REGEX.unapplySeq(masterUrl) == None) { - throw new SparkException("Invalid master URL: " + masterUrl) - } - class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { - val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) + logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(akkaUrl) + master = context.actorFor(Master.toAkkaUrl(masterUrl)) masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c618e87cdd..92e7914b1b 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } private[spark] object Master { + private val systemName = "sparkMaster" + private val actorName = "Master" + private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort) actorSystem.awaitTermination() } + + /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ + def toAkkaUrl(sparkUrl: String): String = { + sparkUrl match { + case sparkUrlRegex(host, port) => + "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + case _ => + throw new SparkException("Invalid master URL: " + sparkUrl) + } + } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) + (actorSystem, boundPort) + } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 8b41620d98..2219dd6262 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,7 +1,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ActorRef, Props, Actor, ActorSystem} import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected import spark.deploy.RegisterWorker import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed +import spark.deploy.master.Master import akka.actor.Terminated import java.io.File @@ -27,7 +28,6 @@ private[spark] class Worker( extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r var master: ActorRef = null var masterWebUiUrl : String = "" @@ -48,11 +48,7 @@ private[spark] class Worker( def memoryFree: Int = memory - memoryUsed def createWorkDir() { - workDir = if (workDirPath != null) { - new File(workDirPath) - } else { - new File(sparkHome, "work") - } + workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { if (!workDir.exists() && !workDir.mkdirs()) { logError("Failed to create work directory " + workDir) @@ -68,8 +64,7 @@ private[spark] class Worker( override def preStart() { logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ip, port, cores, Utils.memoryMegabytesToString(memory))) - val envVar = System.getenv("SPARK_HOME") - sparkHome = new File(if (envVar == null) "." else envVar) + sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() connectToMaster() @@ -77,24 +72,15 @@ private[spark] class Worker( } def connectToMaster() { - masterUrl match { - case MASTER_REGEX(masterHost, masterPort) => { - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) - try { - master = context.actorFor(akkaUrl) - master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to master", e) - System.exit(1) - } - } - - case _ => - logError("Invalid master URL: " + masterUrl) + logInfo("Connecting to master " + masterUrl) + try { + master = context.actorFor(Master.toAkkaUrl(masterUrl)) + master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } catch { + case e: Exception => + logError("Failed to connect to master", e) System.exit(1) } } @@ -183,11 +169,19 @@ private[spark] class Worker( private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory, - args.master, args.workDir)), - name = "Worker") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores, + args.memory, args.master, args.workDir) actorSystem.awaitTermination() } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, + masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + // The LocalSparkCluster runs multiple local sparkWorkerX actor systems + val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, + masterUrl, workDir)), name = "Worker") + (actorSystem, boundPort) + } + } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 36398095a2..7be6b9fa87 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster( val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager" - val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" - val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds var driverActor: ActorRef = { diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index e0fdeffbc4..3a3626e8a0 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException * Various utility classes for working with Akka. */ private[spark] object AkkaUtils { + /** * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the * ActorSystem itself and its port (which is hard to get from Akka). + * + * Note: the `name` parameter is important, as even if a client sends a message to right + * host + port, if the system name is incorrect, Akka will drop the message. */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt @@ -41,7 +45,7 @@ private[spark] object AkkaUtils { akka.actor.default-dispatcher.throughput = %d """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) - val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader) // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. -- cgit v1.2.3 From 696eec32c982ca516c506de33f383a173bcbd131 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 2 Feb 2013 02:03:26 -0600 Subject: Move executorMemory up into SchedulerBackend. --- .../scala/spark/scheduler/cluster/SchedulerBackend.scala | 12 ++++++++++++ .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 9 --------- .../spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala | 10 ---------- .../scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 10 ---------- 4 files changed, 12 insertions(+), 29 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala index ddcd64d7c6..9ac875de3a 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala @@ -1,5 +1,7 @@ package spark.scheduler.cluster +import spark.Utils + /** * A backend interface for cluster scheduling systems that allows plugging in different ones under * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as @@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int + // Memory used by each executor (in megabytes) + protected val executorMemory = { + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + Option(System.getProperty("spark.executor.memory")) + .orElse(Option(System.getenv("SPARK_MEM"))) + .map(Utils.memoryStringToMb) + .getOrElse(512) + } + + // TODO: Probably want to add a killTask too } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 2f7099c5b9..59ff8bcb90 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend( val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - // Memory used by each executor (in megabytes) - val executorMemory = { - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - Option(System.getProperty("spark.executor.memory")) - .orElse(Option(System.getenv("SPARK_MEM"))) - .map(Utils.memoryStringToMb) - .getOrElse(512) - } - override def start() { super.start() diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 7bf56a05d6..b481ec0a72 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend( val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - // Memory used by each executor (in megabytes) - val executorMemory = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - // Lock used to wait for scheduler to be registered var isRegistered = false val registeredLock = new Object() diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index eab1c60e0b..5c8b531de3 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend( with MScheduler with Logging { - // Memory used by each executor (in megabytes) - val EXECUTOR_MEMORY = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - // Lock used to wait for scheduler to be registered var isRegistered = false val registeredLock = new Object() -- cgit v1.2.3 From cae8a6795c7f454b74c8d3c4425a6ced151d6d9b Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 2 Feb 2013 02:15:39 -0600 Subject: Fix dangling old variable names. --- core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 5c8b531de3..300766d0f5 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -79,7 +79,7 @@ private[spark] class MesosSchedulerBackend( val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .build() val command = CommandInfo.newBuilder() .setValue(execScript) @@ -151,7 +151,7 @@ private[spark] class MesosSchedulerBackend( def enoughMemory(o: Offer) = { val mem = getResource(o.getResourcesList, "mem") val slaveId = o.getSlaveId.getValue - mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId) + mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) } for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { -- cgit v1.2.3 From 7aba123f0c0fd024105462b3a0b203cd357c67e9 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 2 Feb 2013 13:53:28 -0600 Subject: Further simplify checking for Nil. --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b62b25f688..2a646dd0f5 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -176,9 +176,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - val locs = getCacheLocs(rdd) - val atLeastOneMissing = (0 until rdd.splits.size).exists(locs(_) == Nil) - if (atLeastOneMissing) { + if (getCacheLocs(rdd).contains(Nil)) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => -- cgit v1.2.3 From 34a7bcdb3a19deed18b25225daf47ff22ee20869 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 2 Feb 2013 19:40:30 -0800 Subject: Formatting --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 8cfc08e5ac..2a35915560 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -205,8 +205,9 @@ class DAGScheduler( missing.toList } - /** Returns (and does not) submit a JobSubmitted event suitable to run a given job, and - * a JobWaiter whose getResult() method will return the result of the job when it is complete. + /** + * Returns (and does not submit) a JobSubmitted event suitable to run a given job, and a + * JobWaiter whose getResult() method will return the result of the job when it is complete. * * The job is assumed to have at least one partition; zero partition jobs should be handled * without a JobSubmitted event. @@ -308,7 +309,8 @@ class DAGScheduler( return false } - /** Resubmit any failed stages. Ordinarily called after a small amount of time has passed since + /** + * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since * the last fetch failure. */ private[scheduler] def resubmitFailedStages() { @@ -321,7 +323,8 @@ class DAGScheduler( } } - /** Check for waiting or failed stages which are now eligible for resubmission. + /** + * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. */ private[scheduler] def submitWaitingStages() { @@ -366,9 +369,9 @@ class DAGScheduler( // the same time, so we want to make sure we've identified all the reduce tasks that depend // on the failed node. if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages + resubmitFailedStages() } else { - submitWaitingStages + submitWaitingStages() } } } -- cgit v1.2.3 From 8fbd5380b7f36842297f624bad3a2513f7eca47b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 Feb 2013 06:44:49 +0000 Subject: Fetch fewer objects in PySpark's take() method. --- core/src/main/scala/spark/api/python/PythonRDD.scala | 11 +++++++++-- python/pyspark/rdd.py | 4 ++++ 2 files changed, 13 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 39758e94f4..ab8351e55e 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -238,6 +238,11 @@ private[spark] object PythonRDD { } def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeIteratorToPickleFile(items.asScala, filename) + } + + def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) { val file = new DataOutputStream(new FileOutputStream(filename)) for (item <- items) { writeAsPickle(item, file) @@ -245,8 +250,10 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] = - rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head + def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { + implicit val cm : ClassManifest[T] = rdd.elementClassManifest + rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator + } } private object Pickle { diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fb144bc45d..4cda6cf661 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -372,6 +372,10 @@ class RDD(object): items = [] for partition in range(self._jrdd.splits().size()): iterator = self.ctx._takePartition(self._jrdd.rdd(), partition) + # Each item in the iterator is a string, Python object, batch of + # Python objects. Regardless, it is sufficient to take `num` + # of these objects in order to collect `num` Python objects: + iterator = iterator.take(num) items.extend(self._collect_iterator_through_file(iterator)) if len(items) >= num: break -- cgit v1.2.3 From 9163c3705d98ca19c09fe5618e347b9d20f88f63 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 2 Feb 2013 23:34:47 -0800 Subject: Formatting --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 2a35915560..edbfd1c45f 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -265,7 +265,8 @@ class DAGScheduler( return listener.awaitResult() // Will throw an exception if the job fails } - /** Process one event retrieved from the event queue. + /** + * Process one event retrieved from the event queue. * Returns true if we should stop the event loop. */ private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { -- cgit v1.2.3 From aa4ee1e9e5485c1b96474e704c76225a2b8a7da9 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 4 Feb 2013 11:06:31 -0800 Subject: Fix failing test --- core/src/test/scala/spark/MapOutputTrackerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index f4e7ec39fe..dd19442dcb 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -79,8 +79,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch") { try { System.clearProperty("spark.driver.host") // In case some previous test had set it - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("test", "localhost", 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0) System.setProperty("spark.driver.port", boundPort.toString) val masterTracker = new MapOutputTracker(actorSystem, true) val slaveTracker = new MapOutputTracker(actorSystem, false) -- cgit v1.2.3 From f6ec547ea7b56ee607a4c2a69206f8952318eaf1 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 4 Feb 2013 13:14:54 -0800 Subject: Small fix to test for distinct --- core/src/test/scala/spark/RDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 89a3687386..fe7deb10d6 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -14,7 +14,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? - assert(dups.distinct().collect === dups.distinct().collect) + assert(dups.distinct.collect === dups.distinct().collect) assert(dups.distinct(2).collect === dups.distinct().collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) -- cgit v1.2.3 From 1ba3393ceb5709620a28b8bc01826153993fc444 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 5 Feb 2013 17:56:50 -0600 Subject: Increase DriverSuite timeout. --- core/src/test/scala/spark/DriverSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala index 342610e1dd..5e84b3a66a 100644 --- a/core/src/test/scala/spark/DriverSuite.scala +++ b/core/src/test/scala/spark/DriverSuite.scala @@ -9,10 +9,11 @@ import org.scalatest.time.SpanSugar._ class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { + assert(System.getenv("SPARK_HOME") != null) // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => - failAfter(10 seconds) { + failAfter(30 seconds) { Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME"))) } -- cgit v1.2.3