diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2014-06-25 20:57:48 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-25 20:57:48 -0700 |
commit | b88a59a66845b8935b22f06fc96d16841ed20c94 (patch) | |
tree | 4875813f1c1d1ae0223fa7983fc32e5d84c81752 | |
parent | 7f196b009d26d4aed403b3c694f8b603601718e3 (diff) | |
download | spark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.gz spark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.bz2 spark-b88a59a66845b8935b22f06fc96d16841ed20c94.zip |
[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask
This is a fixed up version of #686 (cc @markhamstra @pwendell). The last commit (the only one I authored) reflects the changes I made from Mark's original patch.
Author: Mark Hamstra <markhamstra@gmail.com>
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits:
42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name
80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled.
d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks
9312baa [Mark Hamstra] code review update
cc353c8 [Mark Hamstra] scalastyle
e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 30 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 48 |
2 files changed, 69 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b3ebaa547d..c8559a7a82 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1038,7 +1038,7 @@ class DAGScheduler( private def failJobAndIndependentStages(job: ActiveJob, failureReason: String, resultStage: Option[Stage]) { val error = new SparkException(failureReason) - job.listener.jobFailed(error) + var ableToCancelStages = true val shouldInterruptThread = if (job.properties == null) false @@ -1062,18 +1062,26 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { - taskScheduler.cancelTasks(stageId, shouldInterruptThread) - val stageInfo = stageToInfos(stage) - stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } catch { + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) + ableToCancelStages = false + } } } } } - cleanupStateForJobAndIndependentStages(job, resultStage) - - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + if (ableToCancelStages) { + job.listener.jobFailed(error) + cleanupStateForJobAndIndependentStages(job, resultStage) + listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } } /** @@ -1155,7 +1163,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) - dagScheduler.doCancelAllJobs() + try { + dagScheduler.doCancelAllJobs() + } catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) + } dagScheduler.sc.stop() Stop } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4536832829..8dd2a9b9f7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F sc = new SparkContext("local", "DAGSchedulerSuite") sparkListener.successfulStages.clear() sparkListener.failedStages.clear() + failure = null sc.addSparkListener(sparkListener) taskSets.clear() cancelledStages.clear() @@ -314,6 +315,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("job cancellation no-kill backend") { + // make sure that the DAGScheduler doesn't crash when the TaskScheduler + // doesn't implement killTask() + val noKillTaskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { + taskSets += taskSet + } + override def cancelTasks(stageId: Int, interruptThread: Boolean) { + throw new UnsupportedOperationException + } + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} + override def defaultParallelism() = 2 + } + val noKillScheduler = new DAGScheduler( + sc, + noKillTaskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) { + override def runLocally(job: ActiveJob) { + // don't bother with the thread while unit testing + runLocallyWithinThread(job) + } + } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system) + val rdd = makeRdd(1, Nil) + val jobId = submit(rdd, Array(0)) + cancel(jobId) + // Because the job wasn't actually cancelled, we shouldn't have received a failure message. + assert(failure === null) + + // When the task set completes normally, state should be correctly updated. + complete(taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.isEmpty) + assert(sparkListener.successfulStages.contains(0)) + } + test("run trivial shuffle") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) |