aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJoshi <rekhajoshm@gmail.com>2015-08-19 21:23:02 +0100
committerSean Owen <sowen@cloudera.com>2015-08-19 21:23:02 +0100
commitf3391ff2b8b9c1f1308755dc223776692e3b7725 (patch)
treec22ab3c2f88673c7ecd933f8ebc001dbcffa2777 /core
parent5b62bef8cbf73f910513ef3b1f557aa94b384854 (diff)
downloadspark-f3391ff2b8b9c1f1308755dc223776692e3b7725.tar.gz
spark-f3391ff2b8b9c1f1308755dc223776692e3b7725.tar.bz2
spark-f3391ff2b8b9c1f1308755dc223776692e3b7725.zip
[SPARK-8889] [CORE] Fix for OOM for graph creation
Fix for OOM for graph creation Author: Joshi <rekhajoshm@gmail.com> Author: Rekha Joshi <rekhajoshm@gmail.com> Closes #7602 from rekhajoshm/SPARK-8889.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala39
2 files changed, 51 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index ffea9817c0..81f168a447 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ui.scope
import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{StringBuilder, ListBuffer}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StageInfo
@@ -167,7 +167,7 @@ private[ui] object RDDOperationGraph extends Logging {
def makeDotFile(graph: RDDOperationGraph): String = {
val dotFile = new StringBuilder
dotFile.append("digraph G {\n")
- dotFile.append(makeDotSubgraph(graph.rootCluster, indent = " "))
+ makeDotSubgraph(dotFile, graph.rootCluster, indent = " ")
graph.edges.foreach { edge => dotFile.append(s""" ${edge.fromId}->${edge.toId};\n""") }
dotFile.append("}")
val result = dotFile.toString()
@@ -180,18 +180,19 @@ private[ui] object RDDOperationGraph extends Logging {
s"""${node.id} [label="${node.name} [${node.id}]"]"""
}
- /** Return the dot representation of a subgraph in an RDDOperationGraph. */
- private def makeDotSubgraph(cluster: RDDOperationCluster, indent: String): String = {
- val subgraph = new StringBuilder
- subgraph.append(indent + s"subgraph cluster${cluster.id} {\n")
- subgraph.append(indent + s""" label="${cluster.name}";\n""")
+ /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */
+ private def makeDotSubgraph(
+ subgraph: StringBuilder,
+ cluster: RDDOperationCluster,
+ indent: String): Unit = {
+ subgraph.append(indent).append(s"subgraph cluster${cluster.id} {\n")
+ subgraph.append(indent).append(s""" label="${cluster.name}";\n""")
cluster.childNodes.foreach { node =>
- subgraph.append(indent + s" ${makeDotNode(node)};\n")
+ subgraph.append(indent).append(s" ${makeDotNode(node)};\n")
}
cluster.childClusters.foreach { cscope =>
- subgraph.append(makeDotSubgraph(cscope, indent + " "))
+ makeDotSubgraph(subgraph, cscope, indent + " ")
}
- subgraph.append(indent + "}\n")
- subgraph.toString()
+ subgraph.append(indent).append("}\n")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 3aa672f8b7..69888b2694 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui
import java.net.{HttpURLConnection, URL}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+import scala.io.Source
import scala.collection.JavaConversions._
import scala.xml.Node
@@ -603,6 +604,44 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ test("job stages should have expected dotfile under DAG visualization") {
+ withSpark(newSparkContext()) { sc =>
+ // Create a multi-stage job
+ val rdd =
+ sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+ rdd.count()
+
+ val stage0 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
+ assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
+ "label=&quot;Stage 0&quot;;\n subgraph "))
+ assert(stage0.contains("{\n label=&quot;parallelize&quot;;\n " +
+ "0 [label=&quot;ParallelCollectionRDD [0]&quot;];\n }"))
+ assert(stage0.contains("{\n label=&quot;map&quot;;\n " +
+ "1 [label=&quot;MapPartitionsRDD [1]&quot;];\n }"))
+ assert(stage0.contains("{\n label=&quot;groupBy&quot;;\n " +
+ "2 [label=&quot;MapPartitionsRDD [2]&quot;];\n }"))
+
+ val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
+ assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
+ "label=&quot;Stage 1&quot;;\n subgraph "))
+ assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
+ "3 [label=&quot;ShuffledRDD [3]&quot;];\n }"))
+ assert(stage1.contains("{\n label=&quot;map&quot;;\n " +
+ "4 [label=&quot;MapPartitionsRDD [4]&quot;];\n }"))
+ assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
+ "5 [label=&quot;MapPartitionsRDD [5]&quot;];\n }"))
+
+ val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
+ "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
+ assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
+ "label=&quot;Stage 2&quot;;\n subgraph "))
+ assert(stage2.contains("{\n label=&quot;groupBy&quot;;\n " +
+ "6 [label=&quot;ShuffledRDD [6]&quot;];\n }"))
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}