aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala50
1 files changed, 34 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index edab727fc4..8418fa74d2 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test LRU eviction of stages") {
+ def runWithListener(listener: JobProgressListener) : Unit = {
+ for (i <- 1 to 50) {
+ listener.onStageSubmitted(createStageStartEvent(i))
+ listener.onStageCompleted(createStageEndEvent(i))
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ }
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
- val listener = new JobProgressListener(conf)
-
- for (i <- 1 to 50) {
- listener.onStageSubmitted(createStageStartEvent(i))
- listener.onStageCompleted(createStageEndEvent(i))
- }
- assertActiveJobsStateIsEmpty(listener)
+ var listener = new JobProgressListener(conf)
+ // Test with 5 retainedStages
+ runWithListener(listener)
listener.completedStages.size should be (5)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
+
+ // Test with 0 retainedStages
+ conf.set("spark.ui.retainedStages", 0.toString)
+ listener = new JobProgressListener(conf)
+ runWithListener(listener)
+ listener.completedStages.size should be (0)
}
test("test clearing of stageIdToActiveJobs") {
@@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test clearing of jobGroupToJobIds") {
+ def runWithListener(listener: JobProgressListener): Unit = {
+ // Run 50 jobs, each with one stage
+ for (jobId <- 0 to 50) {
+ listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
+ listener.onStageSubmitted(createStageStartEvent(0))
+ listener.onStageCompleted(createStageEndEvent(0, failed = false))
+ listener.onJobEnd(createJobEndEvent(jobId, false))
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ }
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
- val listener = new JobProgressListener(conf)
- // Run 50 jobs, each with one stage
- for (jobId <- 0 to 50) {
- listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
- listener.onStageSubmitted(createStageStartEvent(0))
- listener.onStageCompleted(createStageEndEvent(0, failed = false))
- listener.onJobEnd(createJobEndEvent(jobId, false))
- }
- assertActiveJobsStateIsEmpty(listener)
+ var listener = new JobProgressListener(conf)
+ runWithListener(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
+
+ // Test with 0 jobs
+ conf.set("spark.ui.retainedJobs", 0.toString)
+ listener = new JobProgressListener(conf)
+ runWithListener(listener)
+ listener.jobGroupToJobIds.size should be (0)
}
test("test LRU eviction of jobs") {