diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-04-03 22:13:56 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-03 22:13:56 -0700 |
commit | ee6e9e7d863022304ac9ced405b353b63accb6ab (patch) | |
tree | d5fe7d96e8e8613c5b07ed638f3bafe1c1d38942 | |
parent | 33e63618d061eeaae257a7350ea3287a702fc123 (diff) | |
download | spark-ee6e9e7d863022304ac9ced405b353b63accb6ab.tar.gz spark-ee6e9e7d863022304ac9ced405b353b63accb6ab.tar.bz2 spark-ee6e9e7d863022304ac9ced405b353b63accb6ab.zip |
SPARK-1337: Application web UI garbage collects newest stages
Simple fix...
Author: Patrick Wendell <pwendell@gmail.com>
Closes #320 from pwendell/stage-clean-up and squashes the following commits:
29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead old ones
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 33 |
2 files changed, 35 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index d10aa12b9e..cd4be57227 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { /** If stages is too large, remove and garbage collect old stages */ private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = retainedStages / 10 - stages.takeRight(toRemove).foreach( s => { + val toRemove = math.max(retainedStages / 10, 1) + stages.take(toRemove).foreach { s => stageIdToTaskData.remove(s.stageId) stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) @@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToTasksFailed.remove(s.stageId) stageIdToPool.remove(s.stageId) if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)} - }) - stages.trimEnd(toRemove) + } + stages.trimStart(toRemove) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index d8a3e859f8..67ceee505d 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -18,13 +18,42 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{LocalSparkContext, SparkContext, Success} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success} import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils -class JobProgressListenerSuite extends FunSuite with LocalSparkContext { +class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + test("test LRU eviction of stages") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) + + def createStageStartEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + SparkListenerStageSubmitted(stageInfo) + } + + def createStageEndEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + SparkListenerStageCompleted(stageInfo) + } + + for (i <- 1 to 50) { + listener.onStageSubmitted(createStageStartEvent(i)) + listener.onStageCompleted(createStageEndEvent(i)) + } + + listener.completedStages.size should be (5) + listener.completedStages.filter(_.stageId == 50).size should be (1) + listener.completedStages.filter(_.stageId == 49).size should be (1) + listener.completedStages.filter(_.stageId == 48).size should be (1) + listener.completedStages.filter(_.stageId == 47).size should be (1) + listener.completedStages.filter(_.stageId == 46).size should be (1) + } + test("test executor id to summary") { val sc = new SparkContext("local", "test") val listener = new JobProgressListener(sc.conf) |