aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala6
2 files changed, 33 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 84ca750e1a..0e330879d5 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDInfo, StorageLevel}
/**
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
- def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
+ def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
@@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+ var rootNodeCount = 0
+ val addRDDIds = new mutable.HashSet[Int]()
+ val dropRDDIds = new mutable.HashSet[Int]()
+
// Find nodes, edges, and operation scopes that belong to this stage
- stage.rddInfos.foreach { rdd =>
- edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
+ stage.rddInfos.sortBy(_.id).foreach { rdd =>
+ val parentIds = rdd.parentIds
+ val isAllowed =
+ if (parentIds.isEmpty) {
+ rootNodeCount += 1
+ rootNodeCount <= retainedNodes
+ } else {
+ parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
+ }
+
+ if (isAllowed) {
+ addRDDIds += rdd.id
+ edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
+ } else {
+ dropRDDIds += rdd.id
+ }
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
-
if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
- rootCluster.attachChildNode(node)
+ if (isAllowed) {
+ rootCluster.attachChildNode(node)
+ }
} else {
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
@@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
rootCluster.attachChildCluster(cluster)
}
}
- rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ if (isAllowed) {
+ rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index bcae56e2f1..37a12a8646 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
+ // How many root nodes to retain in DAG Graph
+ private[ui] val retainedNodes =
+ conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)
+
// How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
@@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
- stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
trimStagesIfNecessary()
}