diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2014-04-02 10:35:52 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-02 10:35:52 -0700 |
commit | 11973a7bdad58fdb759033c232d87f0b279c83b4 (patch) | |
tree | f91b033c3197d49a22c055184e90f40ba7e3fa46 /core/src | |
parent | ea9de658a365dca2b7403d8fab68a8a87c4e06c8 (diff) | |
download | spark-11973a7bdad58fdb759033c232d87f0b279c83b4.tar.gz spark-11973a7bdad58fdb759033c232d87f0b279c83b4.tar.bz2 spark-11973a7bdad58fdb759033c232d87f0b279c83b4.zip |
Renamed stageIdToActiveJob to jobIdToActiveJob.
This data structure was misused and, as a result, later renamed to an incorrect name.
This data structure seems to have gotten into this tangled state as a result of @henrydavidge using the stageID instead of the job Id to index into it and later @andrewor14 renaming the data structure to reflect this misunderstanding.
This patch renames it and removes an incorrect indexing into it. The incorrect indexing into it meant that the code added by @henrydavidge to warn when a task size is too large (added here https://github.com/apache/spark/commit/57579934f0454f258615c10e69ac2adafc5b9835) was not always executed; this commit fixes that.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #301 from kayousterhout/fixCancellation and squashes the following commits:
bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 21 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 |
2 files changed, 11 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4fce47e1ee..ef3d24d746 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -84,7 +84,7 @@ class DAGScheduler( private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] - private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob] + private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob] private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] @@ -536,7 +536,7 @@ class DAGScheduler( listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) runLocally(job) } else { - stageIdToActiveJob(jobId) = job + jobIdToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job listenerBus.post( @@ -559,7 +559,7 @@ class DAGScheduler( // Cancel all running jobs. runningStages.map(_.jobId).foreach(handleJobCancellation) activeJobs.clear() // These should already be empty by this point, - stageIdToActiveJob.clear() // but just in case we lost track of some jobs... + jobIdToActiveJob.clear() // but just in case we lost track of some jobs... case ExecutorAdded(execId, host) => handleExecutorAdded(execId, host) @@ -569,7 +569,6 @@ class DAGScheduler( case BeginEvent(task, taskInfo) => for ( - job <- stageIdToActiveJob.get(task.stageId); stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage) ) { @@ -697,7 +696,7 @@ class DAGScheduler( private def activeJobForStage(stage: Stage): Option[Int] = { if (stageIdToJobIds.contains(stage.id)) { val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted - jobsThatUseStage.find(stageIdToActiveJob.contains) + jobsThatUseStage.find(jobIdToActiveJob.contains) } else { None } @@ -750,8 +749,8 @@ class DAGScheduler( } } - val properties = if (stageIdToActiveJob.contains(jobId)) { - stageIdToActiveJob(stage.jobId).properties + val properties = if (jobIdToActiveJob.contains(jobId)) { + jobIdToActiveJob(stage.jobId).properties } else { // this stage will be assigned to "default" pool null @@ -827,7 +826,7 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { - stageIdToActiveJob -= stage.jobId + jobIdToActiveJob -= stage.jobId activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) @@ -986,11 +985,11 @@ class DAGScheduler( val independentStages = removeJobAndIndependentStages(jobId) independentStages.foreach(taskScheduler.cancelTasks) val error = new SparkException("Job %d cancelled".format(jobId)) - val job = stageIdToActiveJob(jobId) + val job = jobIdToActiveJob(jobId) job.listener.jobFailed(error) jobIdToStageIds -= jobId activeJobs -= job - stageIdToActiveJob -= jobId + jobIdToActiveJob -= jobId listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id))) } } @@ -1011,7 +1010,7 @@ class DAGScheduler( val error = new SparkException("Job aborted: " + reason) job.listener.jobFailed(error) jobIdToStageIdsRemove(job.jobId) - stageIdToActiveJob -= resultStage.jobId + jobIdToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c97543f57d..ce567b0cde 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.pendingTasks.isEmpty) assert(scheduler.activeJobs.isEmpty) assert(scheduler.failedStages.isEmpty) - assert(scheduler.stageIdToActiveJob.isEmpty) + assert(scheduler.jobIdToActiveJob.isEmpty) assert(scheduler.jobIdToStageIds.isEmpty) assert(scheduler.stageIdToJobIds.isEmpty) assert(scheduler.stageIdToStage.isEmpty) |