aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-03 22:13:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-03 22:13:56 -0700
commitee6e9e7d863022304ac9ced405b353b63accb6ab (patch)
treed5fe7d96e8e8613c5b07ed638f3bafe1c1d38942 /core
parent33e63618d061eeaae257a7350ea3287a702fc123 (diff)
downloadspark-ee6e9e7d863022304ac9ced405b353b63accb6ab.tar.gz
spark-ee6e9e7d863022304ac9ced405b353b63accb6ab.tar.bz2
spark-ee6e9e7d863022304ac9ced405b353b63accb6ab.zip
SPARK-1337: Application web UI garbage collects newest stages
Simple fix... Author: Patrick Wendell <pwendell@gmail.com> Closes #320 from pwendell/stage-clean-up and squashes the following commits: 29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead old ones
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala33
2 files changed, 35 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d10aa12b9e..cd4be57227 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
/** If stages is too large, remove and garbage collect old stages */
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
- val toRemove = retainedStages / 10
- stages.takeRight(toRemove).foreach( s => {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stages.take(toRemove).foreach { s =>
stageIdToTaskData.remove(s.stageId)
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
@@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTasksFailed.remove(s.stageId)
stageIdToPool.remove(s.stageId)
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
- })
- stages.trimEnd(toRemove)
+ }
+ stages.trimStart(toRemove)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index d8a3e859f8..67ceee505d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -18,13 +18,42 @@
package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+ test("test LRU eviction of stages") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ val listener = new JobProgressListener(conf)
+
+ def createStageStartEvent(stageId: Int) = {
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ SparkListenerStageSubmitted(stageInfo)
+ }
+
+ def createStageEndEvent(stageId: Int) = {
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ SparkListenerStageCompleted(stageInfo)
+ }
+
+ for (i <- 1 to 50) {
+ listener.onStageSubmitted(createStageStartEvent(i))
+ listener.onStageCompleted(createStageEndEvent(i))
+ }
+
+ listener.completedStages.size should be (5)
+ listener.completedStages.filter(_.stageId == 50).size should be (1)
+ listener.completedStages.filter(_.stageId == 49).size should be (1)
+ listener.completedStages.filter(_.stageId == 48).size should be (1)
+ listener.completedStages.filter(_.stageId == 47).size should be (1)
+ listener.completedStages.filter(_.stageId == 46).size should be (1)
+ }
+
test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc.conf)