diff options
author | cenyuhai <cenyuhai@didichuxing.com> | 2016-09-12 11:52:56 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-09-12 11:52:56 +0100 |
commit | cc87280fcd065b01667ca7a59a1a32c7ab757355 (patch) | |
tree | 7d6e2525933879ab10f6cb4ad0cfb2ca6ce2de71 /core/src | |
parent | 72eec70bdbf6fb67c977463db5d8d95dd3040ae8 (diff) | |
download | spark-cc87280fcd065b01667ca7a59a1a32c7ab757355.tar.gz spark-cc87280fcd065b01667ca7a59a1a32c7ab757355.tar.bz2 spark-cc87280fcd065b01667ca7a59a1a32c7ab757355.zip |
[SPARK-17171][WEB UI] DAG will list all partitions in the graph
## What changes were proposed in this pull request?
DAG will list all partitions in the graph, it is too slow and hard to see all graph.
Always we don't want to see all partitions,we just want to see the relations of DAG graph.
So I just show 2 root nodes for Rdds.
Before this PR, the DAG graph looks like [dag1.png](https://issues.apache.org/jira/secure/attachment/12824702/dag1.png), [dag3.png](https://issues.apache.org/jira/secure/attachment/12825456/dag3.png), after this PR, the DAG graph looks like [dag2.png](https://issues.apache.org/jira/secure/attachment/12824703/dag2.png),[dag4.png](https://issues.apache.org/jira/secure/attachment/12825457/dag4.png)
Author: cenyuhai <cenyuhai@didichuxing.com>
Author: 岑玉海 <261810726@qq.com>
Closes #14737 from cenyuhai/SPARK-17171.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala | 35 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala | 6 |
2 files changed, 33 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 84ca750e1a..0e330879d5 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.internal.Logging import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{RDDInfo, StorageLevel} /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging { * supporting in the future if we decide to group certain stages within the same job under * a common scope (e.g. part of a SQL query). */ - def makeOperationGraph(stage: StageInfo): RDDOperationGraph = { + def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = { val edges = new ListBuffer[RDDOperationEdge] val nodes = new mutable.HashMap[Int, RDDOperationNode] val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID @@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging { { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) + var rootNodeCount = 0 + val addRDDIds = new mutable.HashSet[Int]() + val dropRDDIds = new mutable.HashSet[Int]() + // Find nodes, edges, and operation scopes that belong to this stage - stage.rddInfos.foreach { rdd => - edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } + stage.rddInfos.sortBy(_.id).foreach { rdd => + val parentIds = rdd.parentIds + val isAllowed = + if (parentIds.isEmpty) { + rootNodeCount += 1 + rootNodeCount <= retainedNodes + } else { + parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id)) + } + + if (isAllowed) { + addRDDIds += rdd.id + edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id)) + } else { + dropRDDIds += rdd.id + } // TODO: differentiate between the intention to cache an RDD and whether it's actually cached val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode( rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite)) - if (rdd.scope.isEmpty) { // This RDD has no encompassing scope, so we put it directly in the root cluster // This should happen only if an RDD is instantiated outside of a public RDD API - rootCluster.attachChildNode(node) + if (isAllowed) { + rootCluster.attachChildNode(node) + } } else { // Otherwise, this RDD belongs to an inner cluster, // which may be nested inside of other clusters @@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging { rootCluster.attachChildCluster(cluster) } } - rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } + if (isAllowed) { + rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index bcae56e2f1..37a12a8646 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen private[ui] val jobIds = new mutable.ArrayBuffer[Int] private[ui] val stageIds = new mutable.ArrayBuffer[Int] + // How many root nodes to retain in DAG Graph + private[ui] val retainedNodes = + conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue) + // How many jobs or stages to retain graph metadata for private val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) @@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen val stageId = stageInfo.stageId stageIds += stageId stageIdToJobId(stageId) = jobId - stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) + stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes) trimStagesIfNecessary() } |