aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala2
-rw-r--r--core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala130
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala564
7 files changed, 728 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 766e9797f9..a2eadbcbd6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1396,7 +1396,7 @@ class DAGScheduler(
stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
- logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+ logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}
outputCommitCoordinator.stageEnd(stage.id)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 80f2bf4122..77fda6fcff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -59,7 +59,7 @@ private[spark] class DirectTaskResult[T](
val numUpdates = in.readInt
if (numUpdates == 0) {
- accumUpdates = null
+ accumUpdates = Seq()
} else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
index 3c570ffd8f..757c6d2296 100644
--- a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
+++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -1 +1,2 @@
-org.apache.spark.scheduler.DummyExternalClusterManager \ No newline at end of file
+org.apache.spark.scheduler.DummyExternalClusterManager
+org.apache.spark.scheduler.MockExternalClusterManager \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
new file mode 100644
index 0000000000..6c9d4fb6f3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.spark._
+
+class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
+
+ val badHost = "host-0"
+
+ /**
+ * This backend just always fails if the task is executed on a bad host, but otherwise succeeds
+ * all tasks.
+ */
+ def badHostBackend(): Unit = {
+ val task = backend.beginTask()
+ val host = backend.executorIdToExecutor(task.executorId).host
+ if (host == badHost) {
+ backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
+ } else {
+ backend.taskSuccess(task, 42)
+ }
+ }
+
+ // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
+ // according to locality preferences, and so the job fails
+ testScheduler("If preferred node is bad, without blacklist job will fail") {
+ val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
+ withBackend(badHostBackend _) {
+ val jobFuture = submit(rdd, (0 until 10).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results.isEmpty)
+ assertDataStructuresEmpty(noFailure = false)
+ }
+
+ // even with the blacklist turned on, if maxTaskFailures is not more than the number
+ // of executors on the bad node, then locality preferences will lead to us cycling through
+ // the executors on the bad node, and still failing the job
+ testScheduler(
+ "With blacklist on, job will still fail if there are too many bad executors on bad host",
+ extraConfs = Seq(
+ // just set this to something much longer than the test duration
+ ("spark.scheduler.executorTaskBlacklistTime", "10000000")
+ )
+ ) {
+ val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
+ withBackend(badHostBackend _) {
+ val jobFuture = submit(rdd, (0 until 10).toArray)
+ val duration = Duration(3, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results.isEmpty)
+ assertDataStructuresEmpty(noFailure = false)
+ }
+
+ // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
+ // schedule on a good node and succeed the job
+ testScheduler(
+ "Bad node with multiple executors, job will still succeed with the right confs",
+ extraConfs = Seq(
+ // just set this to something much longer than the test duration
+ ("spark.scheduler.executorTaskBlacklistTime", "10000000"),
+ // this has to be higher than the number of executors on the bad host
+ ("spark.task.maxFailures", "5"),
+ // just to avoid this test taking too long
+ ("spark.locality.wait", "10ms")
+ )
+ ) {
+ val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
+ withBackend(badHostBackend _) {
+ val jobFuture = submit(rdd, (0 until 10).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results === (0 until 10).map { _ -> 42 }.toMap)
+ assertDataStructuresEmpty(noFailure = true)
+ }
+
+}
+
+class MultiExecutorMockBackend(
+ conf: SparkConf,
+ taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {
+
+ val nHosts = conf.getInt("spark.testing.nHosts", 5)
+ val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
+ val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)
+
+ override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
+ (0 until nHosts).flatMap { hostIdx =>
+ val hostName = "host-" + hostIdx
+ (0 until nExecutorsPerHost).map { subIdx =>
+ val executorId = (hostIdx * nExecutorsPerHost + subIdx).toString
+ executorId ->
+ ExecutorTaskStatus(host = hostName, executorId = executorId, nCoresPerExecutor)
+ }
+ }.toMap
+ }
+
+ override def defaultParallelism(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor
+}
+
+class MockRDDWithLocalityPrefs(
+ sc: SparkContext,
+ numPartitions: Int,
+ shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
+ val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ Seq(preferredLoc)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index fc75c9e022..b6765f0645 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -98,6 +98,8 @@ class DAGSchedulerSuiteDummyException extends Exception
class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts {
+ import DAGSchedulerSuite._
+
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
@@ -2034,12 +2036,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}
- private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
- MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
-
- private def makeBlockManagerId(host: String): BlockManagerId =
- BlockManagerId("exec-" + host, host, 12345)
-
private def assertDataStructuresEmpty(): Unit = {
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
@@ -2079,5 +2075,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
}
+}
+
+object DAGSchedulerSuite {
+ def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
+ MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
+ def makeBlockManagerId(host: String): BlockManagerId =
+ BlockManagerId("exec-" + host, host, 12345)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 59c1b359a7..e87cebf0cf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -27,12 +27,24 @@ class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
- // check if the scheduler components are created
- assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
- assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
+ // check if the scheduler components are created and initialized
+ sc.schedulerBackend match {
+ case dummy: DummySchedulerBackend => assert(dummy.initialized)
+ case other => fail(s"wrong scheduler backend: ${other}")
+ }
+ sc.taskScheduler match {
+ case dummy: DummyTaskScheduler => assert(dummy.initialized)
+ case other => fail(s"wrong task scheduler: ${other}")
+ }
}
}
+/**
+ * Super basic ExternalClusterManager, just to verify ExternalClusterManagers can be configured.
+ *
+ * Note that if you want a special ClusterManager for tests, you are probably much more interested
+ * in [[MockExternalClusterManager]] and the corresponding [[SchedulerIntegrationSuite]]
+ */
private class DummyExternalClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
@@ -44,11 +56,15 @@ private class DummyExternalClusterManager extends ExternalClusterManager {
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
- def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ scheduler.asInstanceOf[DummyTaskScheduler].initialized = true
+ backend.asInstanceOf[DummySchedulerBackend].initialized = true
+ }
}
private class DummySchedulerBackend extends SchedulerBackend {
+ var initialized = false
def start() {}
def stop() {}
def reviveOffers() {}
@@ -56,6 +72,7 @@ private class DummySchedulerBackend extends SchedulerBackend {
}
private class DummyTaskScheduler extends TaskScheduler {
+ var initialized = false
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start(): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
new file mode 100644
index 0000000000..02aa5caa73
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.{Duration, SECONDS}
+import scala.reflect.ClassTag
+
+import org.scalactic.TripleEquals
+import org.scalatest.Assertions.AssertionsHelper
+
+import org.apache.spark._
+import org.apache.spark.TaskState._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{CallSite, Utils}
+
+/**
+ * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
+ * TaskSetManagers.
+ *
+ * Test cases are configured by providing a set of jobs to submit, and then simulating interaction
+ * with spark's executors via a mocked backend (eg., task completion, task failure, executors
+ * disconnecting, etc.).
+ */
+abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite
+ with LocalSparkContext {
+
+ var taskScheduler: TestTaskScheduler = null
+ var scheduler: DAGScheduler = null
+ var backend: T = _
+
+ override def beforeEach(): Unit = {
+ if (taskScheduler != null) {
+ taskScheduler.runningTaskSets.clear()
+ }
+ results.clear()
+ failure = null
+ super.beforeEach()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ taskScheduler.stop()
+ backend.stop()
+ scheduler.stop()
+ }
+
+ def setupScheduler(conf: SparkConf): Unit = {
+ conf.setAppName(this.getClass().getSimpleName())
+ val backendClassName = implicitly[ClassTag[T]].runtimeClass.getName()
+ conf.setMaster(s"mock[${backendClassName}]")
+ sc = new SparkContext(conf)
+ backend = sc.schedulerBackend.asInstanceOf[T]
+ taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler]
+ taskScheduler.initialize(sc.schedulerBackend)
+ scheduler = new DAGScheduler(sc, taskScheduler)
+ taskScheduler.setDAGScheduler(scheduler)
+ }
+
+ def testScheduler(name: String)(testBody: => Unit): Unit = {
+ testScheduler(name, Seq())(testBody)
+ }
+
+ def testScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
+ test(name) {
+ val conf = new SparkConf()
+ extraConfs.foreach{ case (k, v) => conf.set(k, v)}
+ setupScheduler(conf)
+ testBody
+ }
+ }
+
+ val results = new HashMap[Int, Any]()
+ var failure: Throwable = _
+
+ /**
+ * When we submit dummy Jobs, this is the compute function we supply.
+ */
+ private val jobComputeFunc: (TaskContext, scala.Iterator[_]) => Any = {
+ (context: TaskContext, it: Iterator[(_)]) =>
+ throw new RuntimeException("jobComputeFunc shouldn't get called in this mock")
+ }
+
+ /** Submits a job to the scheduler, and returns a future which does a bit of error handling. */
+ protected def submit(
+ rdd: RDD[_],
+ partitions: Array[Int],
+ func: (TaskContext, Iterator[_]) => _ = jobComputeFunc): Future[Any] = {
+ val waiter: JobWaiter[Any] = scheduler.submitJob(rdd, func, partitions.toSeq, CallSite("", ""),
+ (index, res) => results(index) = res, new Properties())
+ import scala.concurrent.ExecutionContext.Implicits.global
+ waiter.completionFuture.recover { case ex =>
+ failure = ex
+ }
+ }
+
+ protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = {
+ if (noFailure) {
+ if (failure != null) {
+ // if there is a job failure, it can be a bit hard to tease the job failure msg apart
+ // from the test failure msg, so we do a little extra formatting
+ val msg =
+ raw"""
+ | There was a failed job.
+ | ----- Begin Job Failure Msg -----
+ | ${Utils.exceptionString(failure)}
+ | ----- End Job Failure Msg ----
+ """.
+ stripMargin
+ fail(msg)
+ }
+ // When a job fails, we terminate before waiting for all the task end events to come in,
+ // so there might still be a running task set. So we only check these conditions
+ // when the job succeeds
+ assert(taskScheduler.runningTaskSets.isEmpty)
+ assert(!backend.hasTasks)
+ }
+ assert(scheduler.activeJobs.isEmpty)
+ }
+
+ /**
+ * Looks at all shuffleMapOutputs that are dependencies of the given RDD, and makes sure
+ * they are all registered
+ */
+ def assertMapOutputAvailable(targetRdd: MockRDD): Unit = {
+ val shuffleIds = targetRdd.shuffleDeps.map{_.shuffleId}
+ val nParts = targetRdd.numPartitions
+ for {
+ shuffleId <- shuffleIds
+ reduceIdx <- (0 until nParts)
+ } {
+ val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx)
+ // really we should have already thrown an exception rather than fail either of these
+ // asserts, but just to be extra defensive let's double check the statuses are OK
+ assert(statuses != null)
+ assert(statuses.nonEmpty)
+ }
+ }
+
+ /** models a stage boundary with a single dependency, like a shuffle */
+ def shuffle(nParts: Int, input: MockRDD): MockRDD = {
+ val partitioner = new HashPartitioner(nParts)
+ val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner)
+ new MockRDD(sc, nParts, List(shuffleDep))
+ }
+
+ /** models a stage boundary with multiple dependencies, like a join */
+ def join(nParts: Int, inputs: MockRDD*): MockRDD = {
+ val partitioner = new HashPartitioner(nParts)
+ val shuffleDeps = inputs.map { inputRDD =>
+ new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
+ }
+ new MockRDD(sc, nParts, shuffleDeps)
+ }
+
+ /**
+ * Helper which makes it a little easier to setup a test, which starts a mock backend in another
+ * thread, responding to tasks with your custom function. You also supply the "body" of your
+ * test, where you submit jobs to your backend, wait for them to complete, then check
+ * whatever conditions you want. Note that this is *not* safe to all bad backends --
+ * in particular, your `backendFunc` has to return quickly, it can't throw errors, (instead
+ * it should send back the right TaskEndReason)
+ */
+ def withBackend[T](backendFunc: () => Unit)(testBody: => T): T = {
+ val backendContinue = new AtomicBoolean(true)
+ val backendThread = new Thread("mock backend thread") {
+ override def run(): Unit = {
+ while (backendContinue.get()) {
+ if (backend.hasTasksWaitingToRun) {
+ backendFunc()
+ } else {
+ Thread.sleep(10)
+ }
+ }
+ }
+ }
+ try {
+ backendThread.start()
+ testBody
+ } finally {
+ backendContinue.set(false)
+ backendThread.join()
+ }
+ }
+
+}
+
+/**
+ * Helper for running a backend in integration tests, does a bunch of the book-keeping
+ * so individual tests can focus on just responding to tasks. Individual tests will use
+ * [[beginTask]], [[taskSuccess]], and [[taskFailed]].
+ */
+private[spark] abstract class MockBackend(
+ conf: SparkConf,
+ val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {
+
+ /**
+ * Test backends should call this to get a task that has been assigned to them by the scheduler.
+ * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
+ */
+ def beginTask(): TaskDescription = {
+ synchronized {
+ val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
+ runningTasks += toRun
+ toRun
+ }
+ }
+
+ /**
+ * Tell the scheduler the task completed successfully, with the given result. Also
+ * updates some internal state for this mock.
+ */
+ def taskSuccess(task: TaskDescription, result: Any): Unit = {
+ val ser = env.serializer.newInstance()
+ val resultBytes = ser.serialize(result)
+ val directResult = new DirectTaskResult(resultBytes, Seq()) // no accumulator updates
+ taskUpdate(task, TaskState.FINISHED, directResult)
+ }
+
+ /**
+ * Tell the scheduler the task failed, with the given state and result (probably ExceptionFailure
+ * or FetchFailed). Also updates some internal state for this mock.
+ */
+ def taskFailed(task: TaskDescription, exc: Exception): Unit = {
+ taskUpdate(task, TaskState.FAILED, new ExceptionFailure(exc, Seq()))
+ }
+
+ def taskFailed(task: TaskDescription, reason: TaskFailedReason): Unit = {
+ taskUpdate(task, TaskState.FAILED, reason)
+ }
+
+ def taskUpdate(task: TaskDescription, state: TaskState, result: Any): Unit = {
+ val ser = env.serializer.newInstance()
+ val resultBytes = ser.serialize(result)
+ // statusUpdate is safe to call from multiple threads, its protected inside taskScheduler
+ taskScheduler.statusUpdate(task.taskId, state, resultBytes)
+ if (TaskState.isFinished(state)) {
+ synchronized {
+ runningTasks -= task
+ executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK
+ freeCores += taskScheduler.CPUS_PER_TASK
+ }
+ reviveOffers()
+ }
+ }
+
+ // protected by this
+ private val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000)
+ // protected by this
+ private val runningTasks = ArrayBuffer[TaskDescription]()
+
+ def hasTasks: Boolean = synchronized {
+ assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty
+ }
+
+ def hasTasksWaitingToRun: Boolean = {
+ assignedTasksWaitingToRun.nonEmpty
+ }
+
+ override def start(): Unit = {}
+
+ override def stop(): Unit = {}
+
+ val env = SparkEnv.get
+
+ /** Accessed by both scheduling and backend thread, so should be protected by this. */
+ var freeCores: Int = _
+
+ /**
+ * Accessed by both scheduling and backend thread, so should be protected by this.
+ * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus,
+ * but for simplicity in this mock just lock the whole backend.
+ */
+ def executorIdToExecutor: Map[String, ExecutorTaskStatus]
+
+ private def generateOffers(): Seq[WorkerOffer] = {
+ executorIdToExecutor.values.filter { exec =>
+ exec.freeCores > 0
+ }.map { exec =>
+ WorkerOffer(executorId = exec.executorId, host = exec.host,
+ cores = exec.freeCores)
+ }.toSeq
+ }
+
+ /**
+ * This is called by the scheduler whenever it has tasks it would like to schedule. It gets
+ * called in the scheduling thread, not the backend thread.
+ */
+ override def reviveOffers(): Unit = {
+ val offers: Seq[WorkerOffer] = generateOffers()
+ val newTasks = taskScheduler.resourceOffers(offers).flatten
+ synchronized {
+ newTasks.foreach { task =>
+ executorIdToExecutor(task.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
+ }
+ freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
+ assignedTasksWaitingToRun ++= newTasks
+ }
+ }
+
+ override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
+ // We have to implement this b/c of SPARK-15385.
+ // Its OK for this to be a no-op, because even if a backend does implement killTask,
+ // it really can only be "best-effort" in any case, and the scheduler should be robust to that.
+ // And in fact its reasonably simulating a case where a real backend finishes tasks in between
+ // the time when the scheduler sends the msg to kill tasks, and the backend receives the msg.
+ }
+}
+
+/**
+ * A very simple mock backend that can just run one task at a time.
+ */
+private[spark] class SingleCoreMockBackend(
+ conf: SparkConf,
+ taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {
+
+ val cores = 1
+
+ override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores)
+
+ freeCores = cores
+ val localExecutorId = SparkContext.DRIVER_IDENTIFIER
+ val localExecutorHostname = "localhost"
+
+ override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map(
+ localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores)
+ )
+}
+
+case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: Int)
+
+class MockRDD(
+ sc: SparkContext,
+ val numPartitions: Int,
+ val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
+) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
+
+ MockRDD.validate(numPartitions, shuffleDeps)
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+ throw new RuntimeException("should not be reached")
+ override def getPartitions: Array[Partition] = {
+ (0 until numPartitions).map(i => new Partition {
+ override def index: Int = i
+ }).toArray
+ }
+ override def getPreferredLocations(split: Partition): Seq[String] = Nil
+ override def toString: String = "MockRDD " + id
+}
+
+object MockRDD extends AssertionsHelper with TripleEquals {
+ /**
+ * make sure all the shuffle dependencies have a consistent number of output partitions
+ * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong)
+ */
+ def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = {
+ dependencies.foreach { dependency =>
+ val partitioner = dependency.partitioner
+ assert(partitioner != null)
+ assert(partitioner.numPartitions === numPartitions)
+ }
+ }
+}
+
+/** Simple cluster manager that wires up our mock backend. */
+private class MockExternalClusterManager extends ExternalClusterManager {
+
+ val MOCK_REGEX = """mock\[(.*)\]""".r
+ def canCreate(masterURL: String): Boolean = MOCK_REGEX.findFirstIn(masterURL).isDefined
+
+ def createTaskScheduler(
+ sc: SparkContext,
+ masterURL: String): TaskScheduler = {
+ new TestTaskScheduler(sc)
+ }
+
+ def createSchedulerBackend(
+ sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ masterURL match {
+ case MOCK_REGEX(backendClassName) =>
+ val backendClass = Utils.classForName(backendClassName)
+ val ctor = backendClass.getConstructor(classOf[SparkConf], classOf[TaskSchedulerImpl])
+ ctor.newInstance(sc.getConf, scheduler).asInstanceOf[SchedulerBackend]
+ }
+ }
+
+ def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+ }
+}
+
+/** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */
+class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+ /** Set of TaskSets the DAGScheduler has requested executed. */
+ val runningTaskSets = HashSet[TaskSet]()
+
+ override def submitTasks(taskSet: TaskSet): Unit = {
+ runningTaskSets += taskSet
+ super.submitTasks(taskSet)
+ }
+
+ override def taskSetFinished(manager: TaskSetManager): Unit = {
+ runningTaskSets -= manager.taskSet
+ super.taskSetFinished(manager)
+ }
+}
+
+/**
+ * Some very basic tests just to demonstrate the use of the test framework (and verify that it
+ * works).
+ */
+class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCoreMockBackend] {
+
+ /**
+ * Very simple one stage job. Backend successfully completes each task, one by one
+ */
+ testScheduler("super simple job") {
+ def runBackend(): Unit = {
+ val task = backend.beginTask()
+ backend.taskSuccess(task, 42)
+ }
+ withBackend(runBackend _) {
+ val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results === (0 until 10).map { _ -> 42 }.toMap)
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * 5 stage job, diamond dependencies.
+ *
+ * a ----> b ----> d --> result
+ * \--> c --/
+ *
+ * Backend successfully completes each task
+ */
+ testScheduler("multi-stage job") {
+
+ def stageToOutputParts(stageId: Int): Int = {
+ stageId match {
+ case 0 => 10
+ case 2 => 20
+ case _ => 30
+ }
+ }
+
+ val a = new MockRDD(sc, 2, Nil)
+ val b = shuffle(10, a)
+ val c = shuffle(20, a)
+ val d = join(30, b, c)
+
+ def runBackend(): Unit = {
+ val taskDescription = backend.beginTask()
+ val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+ val task = taskSet.tasks(taskDescription.index)
+
+ // make sure the required map output is available
+ task.stageId match {
+ case 1 => assertMapOutputAvailable(b)
+ case 3 => assertMapOutputAvailable(c)
+ case 4 => assertMapOutputAvailable(d)
+ case _ => // no shuffle map input, nothing to check
+ }
+
+ (task.stageId, task.stageAttemptId, task.partitionId) match {
+ case (stage, 0, _) if stage < 4 =>
+ backend.taskSuccess(taskDescription,
+ DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage)))
+ case (4, 0, partition) =>
+ backend.taskSuccess(taskDescription, 4321 + partition)
+ }
+ }
+ withBackend(runBackend _) {
+ val jobFuture = submit(d, (0 until 30).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * 2 stage job, with a fetch failure. Make sure that:
+ * (a) map output is available whenever we run stage 1
+ * (b) we get a second attempt for stage 0 & stage 1
+ */
+ testScheduler("job with fetch failure") {
+ val input = new MockRDD(sc, 2, Nil)
+ val shuffledRdd = shuffle(10, input)
+ val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId
+
+ val stageToAttempts = new HashMap[Int, HashSet[Int]]()
+
+ def runBackend(): Unit = {
+ val taskDescription = backend.beginTask()
+ val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+ val task = taskSet.tasks(taskDescription.index)
+ stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId
+
+ // make sure the required map output is available
+ task.stageId match {
+ case 1 => assertMapOutputAvailable(shuffledRdd)
+ case _ => // no shuffle map input, nothing to check
+ }
+
+ (task.stageId, task.stageAttemptId, task.partitionId) match {
+ case (0, _, _) =>
+ backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 10))
+ case (1, 0, 0) =>
+ val fetchFailed = FetchFailed(
+ DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
+ backend.taskFailed(taskDescription, fetchFailed)
+ case (1, _, partition) =>
+ backend.taskSuccess(taskDescription, 42 + partition)
+ }
+ }
+ withBackend(runBackend _) {
+ val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ }
+ assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
+ assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
+ assertDataStructuresEmpty()
+ }
+
+ testScheduler("job failure after 4 attempts") {
+ def runBackend(): Unit = {
+ val task = backend.beginTask()
+ backend.taskFailed(task, new RuntimeException("test task failure"))
+ }
+ withBackend(runBackend _) {
+ val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+ val duration = Duration(1, SECONDS)
+ Await.ready(jobFuture, duration)
+ failure.getMessage.contains("test task failure")
+ }
+ assert(results.isEmpty)
+ assertDataStructuresEmpty(noFailure = false)
+ }
+}