From 7d877c3439d872ec2a9e07d245e9c96174c0cf00 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 25 Jan 2016 12:44:20 -0800 Subject: [SPARK-12902] [SQL] visualization for generated operators This PR brings back visualization for generated operators, they looks like: ![sql](https://cloud.githubusercontent.com/assets/40902/12460920/0dc7956a-bf6b-11e5-9c3f-8389f452526e.png) ![stage](https://cloud.githubusercontent.com/assets/40902/12460923/11806ac4-bf6b-11e5-9c72-e84a62c5ea93.png) Note: SQL metrics are not supported right now, because they are very slow, will be supported once we have batch mode. Author: Davies Liu Closes #10828 from davies/viz_codegen. --- .../org/apache/spark/ui/static/spark-dag-viz.js | 2 +- .../apache/spark/ui/scope/RDDOperationGraph.scala | 6 +- .../sql/execution/ui/static/spark-sql-viz.css | 6 ++ .../apache/spark/sql/execution/SparkPlanInfo.scala | 9 +- .../spark/sql/execution/ui/ExecutionPage.scala | 4 +- .../spark/sql/execution/ui/SQLListener.scala | 2 +- .../spark/sql/execution/ui/SparkPlanGraph.scala | 95 ++++++++++++++++------ .../sql/execution/metric/SQLMetricsSuite.scala | 10 ++- .../spark/sql/execution/ui/SQLListenerSuite.scala | 2 +- 9 files changed, 104 insertions(+), 32 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 83dbea40b6..4337c42087 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) { renderer(container, g); // Find the stage cluster and mark it for styling and post-processing - container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true); + container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true); } /* -------------------- * diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 06da74f1b6..003c218aad 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging { } } // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster - rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) } + rddClusters.headOption.foreach { cluster => + if (!rootCluster.childClusters.contains(cluster)) { + rootCluster.attachChildCluster(cluster) + } + } rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } } } diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index ddd3a91dd8..303f8ebb88 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -20,6 +20,12 @@ text-shadow: none; } +#plan-viz-graph svg g.cluster rect { + fill: #A0DFFF; + stroke: #3EC0FF; + stroke-width: 1px; +} + #plan-viz-graph svg g.node rect { fill: #C3EBFF; stroke: #3EC0FF; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 4f750ad13a..4dd9928244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -36,12 +36,17 @@ class SparkPlanInfo( private[sql] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { + val children = plan match { + case WholeStageCodegen(child, _) => child :: Nil + case InputAdapter(child) => child :: Nil + case plan => plan.children + } val metrics = plan.metrics.toSeq.map { case (key, metric) => new SQLMetricInfo(metric.name.getOrElse(key), metric.id, Utils.getFormattedClassName(metric.param)) } - val children = plan.children.map(fromSparkPlan) - new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + plan.metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index c74ad40406..49915adf6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") } private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = { - val metadata = graph.nodes.flatMap { node => + val metadata = graph.allNodes.flatMap { node => val nodeId = s"plan-meta-data-${node.id}"
{node.desc}
} @@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
{graph.makeDotFile(metrics)}
-
{graph.nodes.size.toString}
+
{graph.allNodes.size.toString}
{metadata} {planVisualizationResources} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index cd56136927..83c64f755f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi case SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) => val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => + val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => node.metrics.map(metric => metric.accumulatorId -> metric) } val executionUIData = new SQLExecutionUIData( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 3a6eff9399..4eb248569b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph( dotFile.append("}") dotFile.toString() } + + /** + * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. + */ + val allNodes: Seq[SparkPlanGraphNode] = { + nodes.flatMap { + case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster + case node => Seq(node) + } + } } private[sql] object SparkPlanGraph { @@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph { val nodeIdGenerator = new AtomicLong(0) val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() - buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges) + buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null) new SparkPlanGraph(nodes, edges) } @@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph { planInfo: SparkPlanInfo, nodeIdGenerator: AtomicLong, nodes: mutable.ArrayBuffer[SparkPlanGraphNode], - edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { - val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, - SQLMetrics.getMetricParam(metric.metricParam)) + edges: mutable.ArrayBuffer[SparkPlanGraphEdge], + parent: SparkPlanGraphNode, + subgraph: SparkPlanGraphCluster): Unit = { + if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) { + val cluster = new SparkPlanGraphCluster( + nodeIdGenerator.getAndIncrement(), + planInfo.nodeName, + planInfo.simpleString, + mutable.ArrayBuffer[SparkPlanGraphNode]()) + nodes += cluster + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster) + } else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) { + buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null) + } else { + val metrics = planInfo.metrics.map { metric => + SQLPlanMetric(metric.name, metric.accumulatorId, + SQLMetrics.getMetricParam(metric.metricParam)) + } + val node = new SparkPlanGraphNode( + nodeIdGenerator.getAndIncrement(), planInfo.nodeName, + planInfo.simpleString, planInfo.metadata, metrics) + if (subgraph == null) { + nodes += node + } else { + subgraph.nodes += node + } + + if (parent != null) { + edges += SparkPlanGraphEdge(node.id, parent.id) + } + planInfo.children.foreach( + buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph)) } - val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), planInfo.nodeName, - planInfo.simpleString, planInfo.metadata, metrics) - - nodes += node - val childrenNodes = planInfo.children.map( - child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) - for (child <- childrenNodes) { - edges += SparkPlanGraphEdge(child.id, node.id) - } - node } } @@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph { * @param name the name of this SparkPlan node * @param metrics metrics that this SparkPlan node will track */ -private[ui] case class SparkPlanGraphNode( - id: Long, - name: String, - desc: String, - metadata: Map[String, String], - metrics: Seq[SQLPlanMetric]) { +private[ui] class SparkPlanGraphNode( + val id: Long, + val name: String, + val desc: String, + val metadata: Map[String, String], + val metrics: Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { val builder = new mutable.StringBuilder(name) @@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode( } } +/** + * Represent a tree of SparkPlan for WholeStageCodegen. + */ +private[ui] class SparkPlanGraphCluster( + id: Long, + name: String, + desc: String, + val nodes: mutable.ArrayBuffer[SparkPlanGraphNode]) + extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) { + + override def makeDotNode(metricsValue: Map[Long, String]): String = { + s""" + | subgraph cluster${id} { + | label=${name}; + | ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")} + | } + """.stripMargin + } +} + + /** * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child * node id. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 51285431a4..cbae19ebd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // If we can track all jobs, check the metric values val metricValues = sqlContext.listener.getExecutionMetrics(executionId) val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( - df.queryExecution.executedPlan)).nodes.filter { node => + df.queryExecution.executedPlan)).allNodes.filter { node => expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => @@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { ) } + test("WholeStageCodegen metrics") { + // Assume the execution plan is + // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) + // TODO: update metrics in generated operators + val df = sqlContext.range(10).filter('id < 5) + testSparkPlanMetrics(df, 1, Map.empty) + } + test("TungstenAggregate metrics") { // Assume the execution plan is // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index eef3c1f3e3..81a159d542 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val df = createTestDataFrame val accumulatorIds = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) - .nodes.flatMap(_.metrics.map(_.accumulatorId)) + .allNodes.flatMap(_.metrics.map(_.accumulatorId)) // Assume all accumulators are long var accumulatorValue = 0L val accumulatorUpdates = accumulatorIds.map { id => -- cgit v1.2.3