diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-02-24 11:02:47 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-24 11:02:47 -0800 |
commit | 64d2c01ff1048de83b9b8efce987b55e457298f9 (patch) | |
tree | 9e4fafdcb49e905e72c9a08ea467bb54b98dfdd5 /core | |
parent | 201236628a344194f7c20ba8e9afeeaefbe9318c (diff) | |
download | spark-64d2c01ff1048de83b9b8efce987b55e457298f9.tar.gz spark-64d2c01ff1048de83b9b8efce987b55e457298f9.tar.bz2 spark-64d2c01ff1048de83b9b8efce987b55e457298f9.zip |
[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds
Patch should be self-explanatory
pwendell JoshRosen
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #4741 from tdas/SPARK-5967 and squashes the following commits:
653b5bb [Tathagata Das] Fixed the fix and added test
e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 22 |
2 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 0b6fe70bd2..937d95a934 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => jobsUsingStage.remove(jobEnd.jobId) + if (jobsUsingStage.isEmpty) { + stageIdToActiveJobIds.remove(stageId) + } stageIdToInfo.get(stageId).foreach { stageInfo => if (stageInfo.submissionTime.isEmpty) { // if this stage is pending, it won't complete, so mark it as "skipped": diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 6019282d2f..730a4b54f5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) } + test("test clearing of stageIdToActiveJobs") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) + val jobId = 0 + val stageIds = 1 to 50 + // Start a job with 50 stages + listener.onJobStart(createJobStartEvent(jobId, stageIds)) + for (stageId <- stageIds) { + listener.onStageSubmitted(createStageStartEvent(stageId)) + } + listener.stageIdToActiveJobIds.size should be > 0 + + // Complete the stages and job + for (stageId <- stageIds) { + listener.onStageCompleted(createStageEndEvent(stageId, failed = false)) + } + listener.onJobEnd(createJobEndEvent(jobId, false)) + assertActiveJobsStateIsEmpty(listener) + listener.stageIdToActiveJobIds.size should be (0) + } + test("test LRU eviction of jobs") { val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString) |