aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2014-06-25 20:57:48 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-25 20:57:48 -0700
commitb88a59a66845b8935b22f06fc96d16841ed20c94 (patch)
tree4875813f1c1d1ae0223fa7983fc32e5d84c81752 /core/src/main/scala
parent7f196b009d26d4aed403b3c694f8b603601718e3 (diff)
downloadspark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.gz
spark-b88a59a66845b8935b22f06fc96d16841ed20c94.tar.bz2
spark-b88a59a66845b8935b22f06fc96d16841ed20c94.zip
[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask
This is a fixed up version of #686 (cc @markhamstra @pwendell). The last commit (the only one I authored) reflects the changes I made from Mark's original patch. Author: Mark Hamstra <markhamstra@gmail.com> Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits: 42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name 80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled. d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks 9312baa [Mark Hamstra] code review update cc353c8 [Mark Hamstra] scalastyle e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala30
1 files changed, 21 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b3ebaa547d..c8559a7a82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1038,7 +1038,7 @@ class DAGScheduler(
private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
resultStage: Option[Stage]) {
val error = new SparkException(failureReason)
- job.listener.jobFailed(error)
+ var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
@@ -1062,18 +1062,26 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
- taskScheduler.cancelTasks(stageId, shouldInterruptThread)
- val stageInfo = stageToInfos(stage)
- stageInfo.stageFailed(failureReason)
- listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
+ taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+ val stageInfo = stageToInfos(stage)
+ stageInfo.stageFailed(failureReason)
+ listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ } catch {
+ case e: UnsupportedOperationException =>
+ logInfo(s"Could not cancel tasks for stage $stageId", e)
+ ableToCancelStages = false
+ }
}
}
}
}
- cleanupStateForJobAndIndependentStages(job, resultStage)
-
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ if (ableToCancelStages) {
+ job.listener.jobFailed(error)
+ cleanupStateForJobAndIndependentStages(job, resultStage)
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ }
}
/**
@@ -1155,7 +1163,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
case x: Exception =>
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
.format(x.getMessage))
- dagScheduler.doCancelAllJobs()
+ try {
+ dagScheduler.doCancelAllJobs()
+ } catch {
+ case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
+ }
dagScheduler.sc.stop()
Stop
}