aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala48
2 files changed, 69 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3..81a87438a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1031,7 +1031,7 @@ class DAGScheduler(
private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
resultStage: Option[Stage]) {
val error = new SparkException(failureReason)
- job.listener.jobFailed(error)
+ var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
@@ -1055,18 +1055,26 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
- taskScheduler.cancelTasks(stageId, shouldInterruptThread)
- val stageInfo = stageToInfos(stage)
- stageInfo.stageFailed(failureReason)
- listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
+ taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+ val stageInfo = stageToInfos(stage)
+ stageInfo.stageFailed(failureReason)
+ listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ } catch {
+ case e: UnsupportedOperationException =>
+ logInfo(s"Could not cancel tasks for stage $stageId", e)
+ ableToCancelStages = false
+ }
}
}
}
}
- cleanupStateForJobAndIndependentStages(job, resultStage)
-
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ if (ableToCancelStages) {
+ job.listener.jobFailed(error)
+ cleanupStateForJobAndIndependentStages(job, resultStage)
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ }
}
/**
@@ -1148,7 +1156,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
case x: Exception =>
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
.format(x.getMessage))
- dagScheduler.doCancelAllJobs()
+ try {
+ dagScheduler.doCancelAllJobs()
+ } catch {
+ case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
+ }
dagScheduler.sc.stop()
Stop
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7e901f8e91..23a350be56 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -114,6 +114,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
+ failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
cancelledStages.clear()
@@ -299,6 +300,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("job cancellation no-kill backend") {
+ // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+ // doesn't implement killTask()
+ val noKillTaskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+ override def start() = {}
+ override def stop() = {}
+ override def submitTasks(taskSet: TaskSet) = {
+ taskSets += taskSet
+ }
+ override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+ throw new UnsupportedOperationException
+ }
+ override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+ override def defaultParallelism() = 2
+ }
+ val noKillScheduler = new DAGScheduler(
+ sc,
+ noKillTaskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env) {
+ override def runLocally(job: ActiveJob) {
+ // don't bother with the thread while unit testing
+ runLocallyWithinThread(job)
+ }
+ }
+ dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+ Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+ val rdd = makeRdd(1, Nil)
+ val jobId = submit(rdd, Array(0))
+ cancel(jobId)
+ // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
+ assert(failure === null)
+
+ // When the task set completes normally, state should be correctly updated.
+ complete(taskSets(0), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.isEmpty)
+ assert(sparkListener.successfulStages.contains(0))
+ }
+
test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)