aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala39
2 files changed, 51 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index ffea9817c0..81f168a447 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ui.scope
import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{StringBuilder, ListBuffer}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StageInfo
@@ -167,7 +167,7 @@ private[ui] object RDDOperationGraph extends Logging {
def makeDotFile(graph: RDDOperationGraph): String = {
val dotFile = new StringBuilder
dotFile.append("digraph G {\n")
- dotFile.append(makeDotSubgraph(graph.rootCluster, indent = " "))
+ makeDotSubgraph(dotFile, graph.rootCluster, indent = " ")
graph.edges.foreach { edge => dotFile.append(s""" ${edge.fromId}->${edge.toId};\n""") }
dotFile.append("}")
val result = dotFile.toString()
@@ -180,18 +180,19 @@ private[ui] object RDDOperationGraph extends Logging {
s"""${node.id} [label="${node.name} [${node.id}]"]"""
}
- /** Return the dot representation of a subgraph in an RDDOperationGraph. */
- private def makeDotSubgraph(cluster: RDDOperationCluster, indent: String): String = {
- val subgraph = new StringBuilder
- subgraph.append(indent + s"subgraph cluster${cluster.id} {\n")
- subgraph.append(indent + s""" label="${cluster.name}";\n""")
+ /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */
+ private def makeDotSubgraph(
+ subgraph: StringBuilder,
+ cluster: RDDOperationCluster,
+ indent: String): Unit = {
+ subgraph.append(indent).append(s"subgraph cluster${cluster.id} {\n")
+ subgraph.append(indent).append(s""" label="${cluster.name}";\n""")
cluster.childNodes.foreach { node =>
- subgraph.append(indent + s" ${makeDotNode(node)};\n")
+ subgraph.append(indent).append(s" ${makeDotNode(node)};\n")
}
cluster.childClusters.foreach { cscope =>
- subgraph.append(makeDotSubgraph(cscope, indent + " "))
+ makeDotSubgraph(subgraph, cscope, indent + " ")
}
- subgraph.append(indent + "}\n")
- subgraph.toString()
+ subgraph.append(indent).append("}\n")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 3aa672f8b7..69888b2694 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui
import java.net.{HttpURLConnection, URL}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+import scala.io.Source
import scala.collection.JavaConversions._
import scala.xml.Node
@@ -603,6 +604,44 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ test("job stages should have expected dotfile under DAG visualization") {
+ withSpark(newSparkContext()) { sc =>
+ // Create a multi-stage job
+ val rdd =
+ sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+ rdd.count()
+
+ val stage0 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
+ assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
+ "label="Stage 0";\n subgraph "))
+ assert(stage0.contains("{\n label="parallelize";\n " +
+ "0 [label="ParallelCollectionRDD [0]"];\n }"))
+ assert(stage0.contains("{\n label="map";\n " +
+ "1 [label="MapPartitionsRDD [1]"];\n }"))
+ assert(stage0.contains("{\n label="groupBy";\n " +
+ "2 [label="MapPartitionsRDD [2]"];\n }"))
+
+ val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
+ assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
+ "label="Stage 1";\n subgraph "))
+ assert(stage1.contains("{\n label="groupBy";\n " +
+ "3 [label="ShuffledRDD [3]"];\n }"))
+ assert(stage1.contains("{\n label="map";\n " +
+ "4 [label="MapPartitionsRDD [4]"];\n }"))
+ assert(stage1.contains("{\n label="groupBy";\n " +
+ "5 [label="MapPartitionsRDD [5]"];\n }"))
+
+ val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
+ assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
+ "label="Stage 2";\n subgraph "))
+ assert(stage2.contains("{\n label="groupBy";\n " +
+ "6 [label="ShuffledRDD [6]"];\n }"))
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}