aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-04-02 10:35:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-02 10:35:52 -0700
commit11973a7bdad58fdb759033c232d87f0b279c83b4 (patch)
treef91b033c3197d49a22c055184e90f40ba7e3fa46
parentea9de658a365dca2b7403d8fab68a8a87c4e06c8 (diff)
downloadspark-11973a7bdad58fdb759033c232d87f0b279c83b4.tar.gz
spark-11973a7bdad58fdb759033c232d87f0b279c83b4.tar.bz2
spark-11973a7bdad58fdb759033c232d87f0b279c83b4.zip
Renamed stageIdToActiveJob to jobIdToActiveJob.
This data structure was misused and, as a result, later renamed to an incorrect name. This data structure seems to have gotten into this tangled state as a result of @henrydavidge using the stageID instead of the job Id to index into it and later @andrewor14 renaming the data structure to reflect this misunderstanding. This patch renames it and removes an incorrect indexing into it. The incorrect indexing into it meant that the code added by @henrydavidge to warn when a task size is too large (added here https://github.com/apache/spark/commit/57579934f0454f258615c10e69ac2adafc5b9835) was not always executed; this commit fixes that. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #301 from kayousterhout/fixCancellation and squashes the following commits: bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
2 files changed, 11 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4fce47e1ee..ef3d24d746 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
- private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
+ private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
@@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
- stageIdToActiveJob(jobId) = job
+ jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
- stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
+ jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
case BeginEvent(task, taskInfo) =>
for (
- job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
- jobsThatUseStage.find(stageIdToActiveJob.contains)
+ jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
@@ -750,8 +749,8 @@ class DAGScheduler(
}
}
- val properties = if (stageIdToActiveJob.contains(jobId)) {
- stageIdToActiveJob(stage.jobId).properties
+ val properties = if (jobIdToActiveJob.contains(jobId)) {
+ jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
@@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- stageIdToActiveJob -= stage.jobId
+ jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
- val job = stageIdToActiveJob(jobId)
+ val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
- stageIdToActiveJob -= jobId
+ jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
- stageIdToActiveJob -= resultStage.jobId
+ jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c97543f57d..ce567b0cde 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.pendingTasks.isEmpty)
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
- assert(scheduler.stageIdToActiveJob.isEmpty)
+ assert(scheduler.jobIdToActiveJob.isEmpty)
assert(scheduler.jobIdToStageIds.isEmpty)
assert(scheduler.stageIdToJobIds.isEmpty)
assert(scheduler.stageIdToStage.isEmpty)