From f3391ff2b8b9c1f1308755dc223776692e3b7725 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 19 Aug 2015 21:23:02 +0100 Subject: [SPARK-8889] [CORE] Fix for OOM for graph creation Fix for OOM for graph creation Author: Joshi Author: Rekha Joshi Closes #7602 from rekhajoshm/SPARK-8889. --- .../apache/spark/ui/scope/RDDOperationGraph.scala | 23 +++++++------ .../org/apache/spark/ui/UISeleniumSuite.scala | 39 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index ffea9817c0..81f168a447 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -18,7 +18,7 @@ package org.apache.spark.ui.scope import scala.collection.mutable -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{StringBuilder, ListBuffer} import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo @@ -167,7 +167,7 @@ private[ui] object RDDOperationGraph extends Logging { def makeDotFile(graph: RDDOperationGraph): String = { val dotFile = new StringBuilder dotFile.append("digraph G {\n") - dotFile.append(makeDotSubgraph(graph.rootCluster, indent = " ")) + makeDotSubgraph(dotFile, graph.rootCluster, indent = " ") graph.edges.foreach { edge => dotFile.append(s""" ${edge.fromId}->${edge.toId};\n""") } dotFile.append("}") val result = dotFile.toString() @@ -180,18 +180,19 @@ private[ui] object RDDOperationGraph extends Logging { s"""${node.id} [label="${node.name} [${node.id}]"]""" } - /** Return the dot representation of a subgraph in an RDDOperationGraph. */ - private def makeDotSubgraph(cluster: RDDOperationCluster, indent: String): String = { - val subgraph = new StringBuilder - subgraph.append(indent + s"subgraph cluster${cluster.id} {\n") - subgraph.append(indent + s""" label="${cluster.name}";\n""") + /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */ + private def makeDotSubgraph( + subgraph: StringBuilder, + cluster: RDDOperationCluster, + indent: String): Unit = { + subgraph.append(indent).append(s"subgraph cluster${cluster.id} {\n") + subgraph.append(indent).append(s""" label="${cluster.name}";\n""") cluster.childNodes.foreach { node => - subgraph.append(indent + s" ${makeDotNode(node)};\n") + subgraph.append(indent).append(s" ${makeDotNode(node)};\n") } cluster.childClusters.foreach { cscope => - subgraph.append(makeDotSubgraph(cscope, indent + " ")) + makeDotSubgraph(subgraph, cscope, indent + " ") } - subgraph.append(indent + "}\n") - subgraph.toString() + subgraph.append(indent).append("}\n") } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 3aa672f8b7..69888b2694 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui import java.net.{HttpURLConnection, URL} import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import scala.io.Source import scala.collection.JavaConversions._ import scala.xml.Node @@ -603,6 +604,44 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("job stages should have expected dotfile under DAG visualization") { + withSpark(newSparkContext()) { sc => + // Create a multi-stage job + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + rdd.count() + + val stage0 = Source.fromURL(sc.ui.get.appUIAddress + + "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString + assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + + "label="Stage 0";\n subgraph ")) + assert(stage0.contains("{\n label="parallelize";\n " + + "0 [label="ParallelCollectionRDD [0]"];\n }")) + assert(stage0.contains("{\n label="map";\n " + + "1 [label="MapPartitionsRDD [1]"];\n }")) + assert(stage0.contains("{\n label="groupBy";\n " + + "2 [label="MapPartitionsRDD [2]"];\n }")) + + val stage1 = Source.fromURL(sc.ui.get.appUIAddress + + "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString + assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + + "label="Stage 1";\n subgraph ")) + assert(stage1.contains("{\n label="groupBy";\n " + + "3 [label="ShuffledRDD [3]"];\n }")) + assert(stage1.contains("{\n label="map";\n " + + "4 [label="MapPartitionsRDD [4]"];\n }")) + assert(stage1.contains("{\n label="groupBy";\n " + + "5 [label="MapPartitionsRDD [5]"];\n }")) + + val stage2 = Source.fromURL(sc.ui.get.appUIAddress + + "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString + assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + + "label="Stage 2";\n subgraph ")) + assert(stage2.contains("{\n label="groupBy";\n " + + "6 [label="ShuffledRDD [6]"];\n }")) + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } -- cgit v1.2.3