aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-11-22 11:14:39 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-12-03 09:57:32 -0800
commit27c45e523620d801d547f167a5a33d71ee3af7b5 (patch)
treef306bd679a7f5ad4e5187a100c8516a82374a539 /core
parent686a420ddc33407050d9019711cbe801fc352fa3 (diff)
downloadspark-27c45e523620d801d547f167a5a33d71ee3af7b5.tar.gz
spark-27c45e523620d801d547f167a5a33d71ee3af7b5.tar.bz2
spark-27c45e523620d801d547f167a5a33d71ee3af7b5.zip
Cleaned up job cancellation handling
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
1 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b8b3ac0b43..aeac14ad7b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -371,7 +371,7 @@ class DAGScheduler(
}
// Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that
- // were removed and whose associated tasks may need to be cancelled.
+ // were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation.
private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
val registeredStages = jobIdToStageIds(jobId)
val independentStages = new HashSet[Int]()
@@ -562,8 +562,6 @@ class DAGScheduler(
case JobCancelled(jobId) =>
handleJobCancellation(jobId)
- idToActiveJob.get(jobId).foreach(job => activeJobs -= job)
- idToActiveJob -= jobId
case JobGroupCancelled(groupId) =>
// Cancel all jobs belonging to this job group.
@@ -571,14 +569,12 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach { handleJobCancellation }
- activeJobs --= activeInGroup
- idToActiveJob --= jobIds
case AllJobsCancelled =>
// Cancel all running jobs.
running.map(_.jobId).foreach { handleJobCancellation }
- activeJobs.clear()
- idToActiveJob.clear()
+ activeJobs.clear() // These should already be empty by this point,
+ idToActiveJob.clear() // but just in case we lost track of some jobs...
case ExecutorGained(execId, host) =>
handleExecutorGained(execId, host)
@@ -998,6 +994,8 @@ class DAGScheduler(
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage))))
jobIdToStageIds -= jobId
+ activeJobs -= job
+ idToActiveJob -= jobId
}
}