aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorRajesh Balamohan <rbalamohan@apache.org>2016-08-10 15:30:22 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-10 15:30:52 -0700
commitbd2c12fb4994785d5becce541aee9ba73fef1c4c (patch)
tree0baac3281868c4da4ca66ef473a901c68b30c519 /core/src/test/scala/org
parentbf5cb8af4a649e0c7ac565891427484eab9ee5d9 (diff)
downloadspark-bd2c12fb4994785d5becce541aee9ba73fef1c4c.tar.gz
spark-bd2c12fb4994785d5becce541aee9ba73fef1c4c.tar.bz2
spark-bd2c12fb4994785d5becce541aee9ba73fef1c4c.zip
[SPARK-12920][CORE] Honor "spark.ui.retainedStages" to reduce mem-pressure
When large number of jobs are run concurrently with Spark thrift server, thrift server starts running at high CPU due to GC pressure. Job UI retention causes memory pressure with large jobs. https://issues.apache.org/jira/secure/attachment/12783302/SPARK-12920.profiler_job_progress_listner.png has the profiler snapshot. This PR honors `spark.ui.retainedStages` strictly to reduce memory pressure. Manual and unit tests Author: Rajesh Balamohan <rbalamohan@apache.org> Closes #10846 from rajeshbalamohan/SPARK-12920.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala50
1 files changed, 34 insertions, 16 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index edab727fc4..8418fa74d2 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test LRU eviction of stages") {
+ def runWithListener(listener: JobProgressListener) : Unit = {
+ for (i <- 1 to 50) {
+ listener.onStageSubmitted(createStageStartEvent(i))
+ listener.onStageCompleted(createStageEndEvent(i))
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ }
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
- val listener = new JobProgressListener(conf)
-
- for (i <- 1 to 50) {
- listener.onStageSubmitted(createStageStartEvent(i))
- listener.onStageCompleted(createStageEndEvent(i))
- }
- assertActiveJobsStateIsEmpty(listener)
+ var listener = new JobProgressListener(conf)
+ // Test with 5 retainedStages
+ runWithListener(listener)
listener.completedStages.size should be (5)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
+
+ // Test with 0 retainedStages
+ conf.set("spark.ui.retainedStages", 0.toString)
+ listener = new JobProgressListener(conf)
+ runWithListener(listener)
+ listener.completedStages.size should be (0)
}
test("test clearing of stageIdToActiveJobs") {
@@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test clearing of jobGroupToJobIds") {
+ def runWithListener(listener: JobProgressListener): Unit = {
+ // Run 50 jobs, each with one stage
+ for (jobId <- 0 to 50) {
+ listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
+ listener.onStageSubmitted(createStageStartEvent(0))
+ listener.onStageCompleted(createStageEndEvent(0, failed = false))
+ listener.onJobEnd(createJobEndEvent(jobId, false))
+ }
+ assertActiveJobsStateIsEmpty(listener)
+ }
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
- val listener = new JobProgressListener(conf)
- // Run 50 jobs, each with one stage
- for (jobId <- 0 to 50) {
- listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
- listener.onStageSubmitted(createStageStartEvent(0))
- listener.onStageCompleted(createStageEndEvent(0, failed = false))
- listener.onJobEnd(createJobEndEvent(jobId, false))
- }
- assertActiveJobsStateIsEmpty(listener)
+ var listener = new JobProgressListener(conf)
+ runWithListener(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
+
+ // Test with 0 jobs
+ conf.set("spark.ui.retainedJobs", 0.toString)
+ listener = new JobProgressListener(conf)
+ runWithListener(listener)
+ listener.jobGroupToJobIds.size should be (0)
}
test("test LRU eviction of jobs") {