aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala48
1 files changed, 48 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4536832829..8dd2a9b9f7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
+ failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
cancelledStages.clear()
@@ -314,6 +315,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("job cancellation no-kill backend") {
+ // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+ // doesn't implement killTask()
+ val noKillTaskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+ override def start() = {}
+ override def stop() = {}
+ override def submitTasks(taskSet: TaskSet) = {
+ taskSets += taskSet
+ }
+ override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+ throw new UnsupportedOperationException
+ }
+ override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+ override def defaultParallelism() = 2
+ }
+ val noKillScheduler = new DAGScheduler(
+ sc,
+ noKillTaskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env) {
+ override def runLocally(job: ActiveJob) {
+ // don't bother with the thread while unit testing
+ runLocallyWithinThread(job)
+ }
+ }
+ dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+ Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+ val rdd = makeRdd(1, Nil)
+ val jobId = submit(rdd, Array(0))
+ cancel(jobId)
+ // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
+ assert(failure === null)
+
+ // When the task set completes normally, state should be correctly updated.
+ complete(taskSets(0), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.isEmpty)
+ assert(sparkListener.successfulStages.contains(0))
+ }
+
test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)