diff options
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 50 |
1 files changed, 34 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index edab727fc4..8418fa74d2 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with } test("test LRU eviction of stages") { + def runWithListener(listener: JobProgressListener) : Unit = { + for (i <- 1 to 50) { + listener.onStageSubmitted(createStageStartEvent(i)) + listener.onStageCompleted(createStageEndEvent(i)) + } + assertActiveJobsStateIsEmpty(listener) + } val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString) - val listener = new JobProgressListener(conf) - - for (i <- 1 to 50) { - listener.onStageSubmitted(createStageStartEvent(i)) - listener.onStageCompleted(createStageEndEvent(i)) - } - assertActiveJobsStateIsEmpty(listener) + var listener = new JobProgressListener(conf) + // Test with 5 retainedStages + runWithListener(listener) listener.completedStages.size should be (5) listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) + + // Test with 0 retainedStages + conf.set("spark.ui.retainedStages", 0.toString) + listener = new JobProgressListener(conf) + runWithListener(listener) + listener.completedStages.size should be (0) } test("test clearing of stageIdToActiveJobs") { @@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with } test("test clearing of jobGroupToJobIds") { + def runWithListener(listener: JobProgressListener): Unit = { + // Run 50 jobs, each with one stage + for (jobId <- 0 to 50) { + listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) + listener.onStageSubmitted(createStageStartEvent(0)) + listener.onStageCompleted(createStageEndEvent(0, failed = false)) + listener.onJobEnd(createJobEndEvent(jobId, false)) + } + assertActiveJobsStateIsEmpty(listener) + } val conf = new SparkConf() conf.set("spark.ui.retainedJobs", 5.toString) - val listener = new JobProgressListener(conf) - // Run 50 jobs, each with one stage - for (jobId <- 0 to 50) { - listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) - listener.onStageSubmitted(createStageStartEvent(0)) - listener.onStageCompleted(createStageEndEvent(0, failed = false)) - listener.onJobEnd(createJobEndEvent(jobId, false)) - } - assertActiveJobsStateIsEmpty(listener) + var listener = new JobProgressListener(conf) + runWithListener(listener) // This collection won't become empty, but it should be bounded by spark.ui.retainedJobs listener.jobGroupToJobIds.size should be (5) + + // Test with 0 jobs + conf.set("spark.ui.retainedJobs", 0.toString) + listener = new JobProgressListener(conf) + runWithListener(listener) + listener.jobGroupToJobIds.size should be (0) } test("test LRU eviction of jobs") { |