diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-11-22 11:14:39 -0800 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-12-03 09:57:32 -0800 |
commit | 27c45e523620d801d547f167a5a33d71ee3af7b5 (patch) | |
tree | f306bd679a7f5ad4e5187a100c8516a82374a539 | |
parent | 686a420ddc33407050d9019711cbe801fc352fa3 (diff) | |
download | spark-27c45e523620d801d547f167a5a33d71ee3af7b5.tar.gz spark-27c45e523620d801d547f167a5a33d71ee3af7b5.tar.bz2 spark-27c45e523620d801d547f167a5a33d71ee3af7b5.zip |
Cleaned up job cancellation handling
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b8b3ac0b43..aeac14ad7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -371,7 +371,7 @@ class DAGScheduler( } // Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that - // were removed and whose associated tasks may need to be cancelled. + // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { val registeredStages = jobIdToStageIds(jobId) val independentStages = new HashSet[Int]() @@ -562,8 +562,6 @@ class DAGScheduler( case JobCancelled(jobId) => handleJobCancellation(jobId) - idToActiveJob.get(jobId).foreach(job => activeJobs -= job) - idToActiveJob -= jobId case JobGroupCancelled(groupId) => // Cancel all jobs belonging to this job group. @@ -571,14 +569,12 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach { handleJobCancellation } - activeJobs --= activeInGroup - idToActiveJob --= jobIds case AllJobsCancelled => // Cancel all running jobs. running.map(_.jobId).foreach { handleJobCancellation } - activeJobs.clear() - idToActiveJob.clear() + activeJobs.clear() // These should already be empty by this point, + idToActiveJob.clear() // but just in case we lost track of some jobs... case ExecutorGained(execId, host) => handleExecutorGained(execId, host) @@ -998,6 +994,8 @@ class DAGScheduler( job.listener.jobFailed(error) listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage)))) jobIdToStageIds -= jobId + activeJobs -= job + idToActiveJob -= jobId } } |