aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorcenyuhai <cenyuhai@didichuxing.com>2016-09-12 11:52:56 +0100
committerSean Owen <sowen@cloudera.com>2016-09-12 11:52:56 +0100
commitcc87280fcd065b01667ca7a59a1a32c7ab757355 (patch)
tree7d6e2525933879ab10f6cb4ad0cfb2ca6ce2de71 /core/src
parent72eec70bdbf6fb67c977463db5d8d95dd3040ae8 (diff)
downloadspark-cc87280fcd065b01667ca7a59a1a32c7ab757355.tar.gz
spark-cc87280fcd065b01667ca7a59a1a32c7ab757355.tar.bz2
spark-cc87280fcd065b01667ca7a59a1a32c7ab757355.zip
[SPARK-17171][WEB UI] DAG will list all partitions in the graph
## What changes were proposed in this pull request? DAG will list all partitions in the graph, it is too slow and hard to see all graph. Always we don't want to see all partitions,we just want to see the relations of DAG graph. So I just show 2 root nodes for Rdds. Before this PR, the DAG graph looks like [dag1.png](https://issues.apache.org/jira/secure/attachment/12824702/dag1.png), [dag3.png](https://issues.apache.org/jira/secure/attachment/12825456/dag3.png), after this PR, the DAG graph looks like [dag2.png](https://issues.apache.org/jira/secure/attachment/12824703/dag2.png),[dag4.png](https://issues.apache.org/jira/secure/attachment/12825457/dag4.png) Author: cenyuhai <cenyuhai@didichuxing.com> Author: 岑玉海 <261810726@qq.com> Closes #14737 from cenyuhai/SPARK-17171.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala6
2 files changed, 33 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 84ca750e1a..0e330879d5 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDInfo, StorageLevel}
/**
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
- def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
+ def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
@@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+ var rootNodeCount = 0
+ val addRDDIds = new mutable.HashSet[Int]()
+ val dropRDDIds = new mutable.HashSet[Int]()
+
// Find nodes, edges, and operation scopes that belong to this stage
- stage.rddInfos.foreach { rdd =>
- edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
+ stage.rddInfos.sortBy(_.id).foreach { rdd =>
+ val parentIds = rdd.parentIds
+ val isAllowed =
+ if (parentIds.isEmpty) {
+ rootNodeCount += 1
+ rootNodeCount <= retainedNodes
+ } else {
+ parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
+ }
+
+ if (isAllowed) {
+ addRDDIds += rdd.id
+ edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
+ } else {
+ dropRDDIds += rdd.id
+ }
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
-
if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
- rootCluster.attachChildNode(node)
+ if (isAllowed) {
+ rootCluster.attachChildNode(node)
+ }
} else {
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
@@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
rootCluster.attachChildCluster(cluster)
}
}
- rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ if (isAllowed) {
+ rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index bcae56e2f1..37a12a8646 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
+ // How many root nodes to retain in DAG Graph
+ private[ui] val retainedNodes =
+ conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)
+
// How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
@@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
- stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
trimStagesIfNecessary()
}