aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-24 11:02:47 -0800
committerAndrew Or <andrew@databricks.com>2015-02-24 11:02:47 -0800
commit64d2c01ff1048de83b9b8efce987b55e457298f9 (patch)
tree9e4fafdcb49e905e72c9a08ea467bb54b98dfdd5
parent201236628a344194f7c20ba8e9afeeaefbe9318c (diff)
downloadspark-64d2c01ff1048de83b9b8efce987b55e457298f9.tar.gz
spark-64d2c01ff1048de83b9b8efce987b55e457298f9.tar.bz2
spark-64d2c01ff1048de83b9b8efce987b55e457298f9.zip
[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds
Patch should be self-explanatory pwendell JoshRosen Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4741 from tdas/SPARK-5967 and squashes the following commits: 653b5bb [Tathagata Das] Fixed the fix and added test e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala22
2 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0b6fe70bd2..937d95a934 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId)
+ if (jobsUsingStage.isEmpty) {
+ stageIdToActiveJobIds.remove(stageId)
+ }
stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won't complete, so mark it as "skipped":
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6019282d2f..730a4b54f5 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
}
+ test("test clearing of stageIdToActiveJobs") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ val listener = new JobProgressListener(conf)
+ val jobId = 0
+ val stageIds = 1 to 50
+ // Start a job with 50 stages
+ listener.onJobStart(createJobStartEvent(jobId, stageIds))
+ for (stageId <- stageIds) {
+ listener.onStageSubmitted(createStageStartEvent(stageId))
+ }
+ listener.stageIdToActiveJobIds.size should be > 0
+
+ // Complete the stages and job
+ for (stageId <- stageIds) {
+ listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
+ }
+ listener.onJobEnd(createJobEndEvent(jobId, false))
+ assertActiveJobsStateIsEmpty(listener)
+ listener.stageIdToActiveJobIds.size should be (0)
+ }
+
test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)