aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-13 16:27:48 -0700
committerAndrew Or <andrew@databricks.com>2015-05-13 16:27:48 -0700
commitf6e18388d993d99f768c6d547327e0720ec64224 (patch)
tree27b011a3cf168f9d50d09f42fb42425c0ae29117
parente683182c3e6347afdac0e5658487f80e5e054ef4 (diff)
downloadspark-f6e18388d993d99f768c6d547327e0720ec64224.tar.gz
spark-f6e18388d993d99f768c6d547327e0720ec64224.tar.bz2
spark-f6e18388d993d99f768c6d547327e0720ec64224.zip
[SPARK-7608] Clean up old state in RDDOperationGraphListener
This is necessary for streaming and long-running Spark applications. zsxwing tdas Author: Andrew Or <andrew@databricks.com> Closes #6125 from andrewor14/viz-listener-leak and squashes the following commits: 8660949 [Andrew Or] Fix thing + add tests 33c0843 [Andrew Or] Clean up old job state
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala87
2 files changed, 108 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index 2884a49f31..f0f7007d77 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -27,11 +27,16 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
- private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
- private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
- private val stageIds = new mutable.ArrayBuffer[Int]
+ private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
+ private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
+
+ // Keep track of the order in which these are inserted so we can remove old ones
+ private[ui] val jobIds = new mutable.ArrayBuffer[Int]
+ private[ui] val stageIds = new mutable.ArrayBuffer[Int]
// How many jobs or stages to retain graph metadata for
+ private val retainedJobs =
+ conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
@@ -50,15 +55,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId
- val stageInfos = jobStart.stageInfos
+ jobIds += jobId
+ jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
- stageInfos.foreach { stageInfo =>
- stageIds += stageInfo.stageId
- stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ // Remove state for old jobs
+ if (jobIds.size >= retainedJobs) {
+ val toRemove = math.max(retainedJobs / 10, 1)
+ jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
+ jobIds.trimStart(toRemove)
}
- jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
+ }
- // Remove graph metadata for old stages
+ /** Remove graph metadata for old stages */
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
+ val stageInfo = stageSubmitted.stageInfo
+ stageIds += stageInfo.stageId
+ stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
diff --git a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
new file mode 100644
index 0000000000..619b38ac02
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerStageSubmitted, StageInfo}
+
+class RDDOperationGraphListenerSuite extends FunSuite {
+ private var jobIdCounter = 0
+ private var stageIdCounter = 0
+
+ /** Run a job with the specified number of stages. */
+ private def runOneJob(numStages: Int, listener: RDDOperationGraphListener): Unit = {
+ assert(numStages > 0, "I will not run a job with 0 stages for you.")
+ val stageInfos = (0 until numStages).map { _ =>
+ val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
+ listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
+ stageIdCounter += 1
+ stageInfo
+ }
+ listener.onJobStart(new SparkListenerJobStart(jobIdCounter, 0, stageInfos))
+ jobIdCounter += 1
+ }
+
+ test("listener cleans up metadata") {
+
+ val conf = new SparkConf()
+ .set("spark.ui.retainedStages", "10")
+ .set("spark.ui.retainedJobs", "10")
+
+ val listener = new RDDOperationGraphListener(conf)
+ assert(listener.jobIdToStageIds.isEmpty)
+ assert(listener.stageIdToGraph.isEmpty)
+ assert(listener.jobIds.isEmpty)
+ assert(listener.stageIds.isEmpty)
+
+ // Run a few jobs, but not enough for clean up yet
+ runOneJob(1, listener)
+ runOneJob(2, listener)
+ runOneJob(3, listener)
+ assert(listener.jobIdToStageIds.size === 3)
+ assert(listener.stageIdToGraph.size === 6)
+ assert(listener.jobIds.size === 3)
+ assert(listener.stageIds.size === 6)
+
+ // Run a few more, but this time the stages should be cleaned up, but not the jobs
+ runOneJob(5, listener)
+ runOneJob(100, listener)
+ assert(listener.jobIdToStageIds.size === 5)
+ assert(listener.stageIdToGraph.size === 9)
+ assert(listener.jobIds.size === 5)
+ assert(listener.stageIds.size === 9)
+
+ // Run a few more, but this time both jobs and stages should be cleaned up
+ (1 to 100).foreach { _ =>
+ runOneJob(1, listener)
+ }
+ assert(listener.jobIdToStageIds.size === 9)
+ assert(listener.stageIdToGraph.size === 9)
+ assert(listener.jobIds.size === 9)
+ assert(listener.stageIds.size === 9)
+
+ // Ensure we clean up old jobs and stages, not arbitrary ones
+ assert(!listener.jobIdToStageIds.contains(0))
+ assert(!listener.stageIdToGraph.contains(0))
+ assert(!listener.stageIds.contains(0))
+ assert(!listener.jobIds.contains(0))
+ }
+
+}