aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala6
-rw-r--r--sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala95
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala2
9 files changed, 104 insertions, 32 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index 83dbea40b6..4337c42087 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) {
renderer(container, g);
// Find the stage cluster and mark it for styling and post-processing
- container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
+ container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true);
}
/* -------------------- *
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 06da74f1b6..003c218aad 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging {
}
}
// Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
- rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
+ rddClusters.headOption.foreach { cluster =>
+ if (!rootCluster.childClusters.contains(cluster)) {
+ rootCluster.attachChildCluster(cluster)
+ }
+ }
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
}
}
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
index ddd3a91dd8..303f8ebb88 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
@@ -20,6 +20,12 @@
text-shadow: none;
}
+#plan-viz-graph svg g.cluster rect {
+ fill: #A0DFFF;
+ stroke: #3EC0FF;
+ stroke-width: 1px;
+}
+
#plan-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 4f750ad13a..4dd9928244 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -36,12 +36,17 @@ class SparkPlanInfo(
private[sql] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
+ val children = plan match {
+ case WholeStageCodegen(child, _) => child :: Nil
+ case InputAdapter(child) => child :: Nil
+ case plan => plan.children
+ }
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
- val children = plan.children.map(fromSparkPlan)
- new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
+ new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
+ plan.metadata, metrics)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index c74ad40406..49915adf6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
}
private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = {
- val metadata = graph.nodes.flatMap { node =>
+ val metadata = graph.allNodes.flatMap { node =>
val nodeId = s"plan-meta-data-${node.id}"
<div id={nodeId}>{node.desc}</div>
}
@@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
<div class="dot-file">
{graph.makeDotFile(metrics)}
</div>
- <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
+ <div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div>
{metadata}
</div>
{planVisualizationResources}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index cd56136927..83c64f755f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
- val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+ val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
}
val executionUIData = new SQLExecutionUIData(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 3a6eff9399..4eb248569b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
-import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen}
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph(
dotFile.append("}")
dotFile.toString()
}
+
+ /**
+ * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
+ */
+ val allNodes: Seq[SparkPlanGraphNode] = {
+ nodes.flatMap {
+ case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
+ case node => Seq(node)
+ }
+ }
}
private[sql] object SparkPlanGraph {
@@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
- buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
+ buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null)
new SparkPlanGraph(nodes, edges)
}
@@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph {
planInfo: SparkPlanInfo,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
- edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
- val metrics = planInfo.metrics.map { metric =>
- SQLPlanMetric(metric.name, metric.accumulatorId,
- SQLMetrics.getMetricParam(metric.metricParam))
+ edges: mutable.ArrayBuffer[SparkPlanGraphEdge],
+ parent: SparkPlanGraphNode,
+ subgraph: SparkPlanGraphCluster): Unit = {
+ if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) {
+ val cluster = new SparkPlanGraphCluster(
+ nodeIdGenerator.getAndIncrement(),
+ planInfo.nodeName,
+ planInfo.simpleString,
+ mutable.ArrayBuffer[SparkPlanGraphNode]())
+ nodes += cluster
+ buildSparkPlanGraphNode(
+ planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster)
+ } else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) {
+ buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null)
+ } else {
+ val metrics = planInfo.metrics.map { metric =>
+ SQLPlanMetric(metric.name, metric.accumulatorId,
+ SQLMetrics.getMetricParam(metric.metricParam))
+ }
+ val node = new SparkPlanGraphNode(
+ nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
+ planInfo.simpleString, planInfo.metadata, metrics)
+ if (subgraph == null) {
+ nodes += node
+ } else {
+ subgraph.nodes += node
+ }
+
+ if (parent != null) {
+ edges += SparkPlanGraphEdge(node.id, parent.id)
+ }
+ planInfo.children.foreach(
+ buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph))
}
- val node = SparkPlanGraphNode(
- nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
- planInfo.simpleString, planInfo.metadata, metrics)
-
- nodes += node
- val childrenNodes = planInfo.children.map(
- child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
- for (child <- childrenNodes) {
- edges += SparkPlanGraphEdge(child.id, node.id)
- }
- node
}
}
@@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
-private[ui] case class SparkPlanGraphNode(
- id: Long,
- name: String,
- desc: String,
- metadata: Map[String, String],
- metrics: Seq[SQLPlanMetric]) {
+private[ui] class SparkPlanGraphNode(
+ val id: Long,
+ val name: String,
+ val desc: String,
+ val metadata: Map[String, String],
+ val metrics: Seq[SQLPlanMetric]) {
def makeDotNode(metricsValue: Map[Long, String]): String = {
val builder = new mutable.StringBuilder(name)
@@ -118,6 +146,27 @@ private[ui] case class SparkPlanGraphNode(
}
/**
+ * Represent a tree of SparkPlan for WholeStageCodegen.
+ */
+private[ui] class SparkPlanGraphCluster(
+ id: Long,
+ name: String,
+ desc: String,
+ val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
+ extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {
+
+ override def makeDotNode(metricsValue: Map[Long, String]): String = {
+ s"""
+ | subgraph cluster${id} {
+ | label=${name};
+ | ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
+ | }
+ """.stripMargin
+ }
+}
+
+
+/**
* Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child
* node id.
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 51285431a4..cbae19ebd2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
- df.queryExecution.executedPlan)).nodes.filter { node =>
+ df.queryExecution.executedPlan)).allNodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>
@@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
)
}
+ test("WholeStageCodegen metrics") {
+ // Assume the execution plan is
+ // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
+ // TODO: update metrics in generated operators
+ val df = sqlContext.range(10).filter('id < 5)
+ testSparkPlanMetrics(df, 1, Map.empty)
+ }
+
test("TungstenAggregate metrics") {
// Assume the execution plan is
// ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index eef3c1f3e3..81a159d542 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val df = createTestDataFrame
val accumulatorIds =
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
- .nodes.flatMap(_.metrics.map(_.accumulatorId))
+ .allNodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>