aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2014-06-25 20:57:48 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-25 20:57:48 -0700
commitb88a59a66845b8935b22f06fc96d16841ed20c94 (patch)
tree4875813f1c1d1ae0223fa7983fc32e5d84c81752 /core/src/test
parent7f196b009d26d4aed403b3c694f8b603601718e3 (diff)
downloadspark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.gz
spark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.bz2
spark-b88a59a66845b8935b22f06fc96d16841ed20c94.zip
[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask
This is a fixed up version of #686 (cc @markhamstra @pwendell). The last commit (the only one I authored) reflects the changes I made from Mark's original patch. Author: Mark Hamstra <markhamstra@gmail.com> Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits: 42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name 80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled. d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks 9312baa [Mark Hamstra] code review update cc353c8 [Mark Hamstra] scalastyle e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala48
1 files changed, 48 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4536832829..8dd2a9b9f7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
+ failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
cancelledStages.clear()
@@ -314,6 +315,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("job cancellation no-kill backend") {
+ // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+ // doesn't implement killTask()
+ val noKillTaskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+ override def start() = {}
+ override def stop() = {}
+ override def submitTasks(taskSet: TaskSet) = {
+ taskSets += taskSet
+ }
+ override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+ throw new UnsupportedOperationException
+ }
+ override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+ override def defaultParallelism() = 2
+ }
+ val noKillScheduler = new DAGScheduler(
+ sc,
+ noKillTaskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env) {
+ override def runLocally(job: ActiveJob) {
+ // don't bother with the thread while unit testing
+ runLocallyWithinThread(job)
+ }
+ }
+ dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+ Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+ val rdd = makeRdd(1, Nil)
+ val jobId = submit(rdd, Array(0))
+ cancel(jobId)
+ // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
+ assert(failure === null)
+
+ // When the task set completes normally, state should be correctly updated.
+ complete(taskSets(0), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.isEmpty)
+ assert(sparkListener.successfulStages.contains(0))
+ }
+
test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)