diff options
Diffstat (limited to 'core/src/main')
5 files changed, 166 insertions, 67 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index eedefb44b9..3b4ae2ed35 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -15,32 +15,21 @@ * limitations under the License. */ -#dag-viz-graph svg path { - stroke: #444; - stroke-width: 1.5px; -} - -#dag-viz-graph svg g.cluster rect { - stroke-width: 1px; -} - -#dag-viz-graph svg g.node circle { - fill: #444; +#dag-viz-graph a, #dag-viz-graph a:hover { + text-decoration: none; } -#dag-viz-graph svg g.node rect { - fill: #C3EBFF; - stroke: #3EC0FF; - stroke-width: 1px; +#dag-viz-graph .label { + font-weight: normal; + text-shadow: none; } -#dag-viz-graph svg g.node.cached circle { - fill: #444; +#dag-viz-graph svg path { + stroke: #444; + stroke-width: 1.5px; } -#dag-viz-graph svg g.node.cached rect { - fill: #B3F5C5; - stroke: #56F578; +#dag-viz-graph svg g.cluster rect { stroke-width: 1px; } @@ -61,12 +50,23 @@ stroke-width: 1px; } -#dag-viz-graph svg.job g.cluster[class*="stage"] rect { +#dag-viz-graph svg.job g.cluster.skipped rect { + fill: #D6D6D6; + stroke: #B7B7B7; + stroke-width: 1px; +} + +#dag-viz-graph svg.job g.cluster.stage rect { fill: #FFFFFF; stroke: #FF99AC; stroke-width: 1px; } +#dag-viz-graph svg.job g.cluster.stage.skipped rect { + stroke: #ADADAD; + stroke-width: 1px; +} + #dag-viz-graph svg.job g#cross-stage-edges path { fill: none; } @@ -75,6 +75,20 @@ fill: #333; } +#dag-viz-graph svg.job g.cluster.skipped text { + fill: #666; +} + +#dag-viz-graph svg.job g.node circle { + fill: #444; +} + +#dag-viz-graph svg.job g.node.cached circle { + fill: #A3F545; + stroke: #52C366; + stroke-width: 2px; +} + /* Stage page specific styles */ #dag-viz-graph svg.stage g.cluster rect { @@ -83,7 +97,7 @@ stroke-width: 1px; } -#dag-viz-graph svg.stage g.cluster[class*="stage"] rect { +#dag-viz-graph svg.stage g.cluster.stage rect { fill: #FFFFFF; stroke: #FFA6B6; stroke-width: 1px; @@ -97,11 +111,14 @@ fill: #333; } -#dag-viz-graph a, #dag-viz-graph a:hover { - text-decoration: none; +#dag-viz-graph svg.stage g.node rect { + fill: #C3EBFF; + stroke: #3EC0FF; + stroke-width: 1px; } -#dag-viz-graph .label { - font-weight: normal; - text-shadow: none; +#dag-viz-graph svg.stage g.node.cached rect { + fill: #B3F5C5; + stroke: #52C366; + stroke-width: 2px; } diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index ee48fd29a6..aaeba5b102 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -57,9 +57,7 @@ var VizConstants = { stageSep: 40, graphPrefix: "graph_", nodePrefix: "node_", - stagePrefix: "stage_", - clusterPrefix: "cluster_", - stageClusterPrefix: "cluster_stage_" + clusterPrefix: "cluster_" }; var JobPageVizConstants = { @@ -133,9 +131,7 @@ function renderDagViz(forJob) { } // Render - var svg = graphContainer() - .append("svg") - .attr("class", jobOrStage); + var svg = graphContainer().append("svg").attr("class", jobOrStage); if (forJob) { renderDagVizForJob(svg); } else { @@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) { var dot = metadata.select(".dot-file").text(); var stageId = metadata.attr("stage-id"); var containerId = VizConstants.graphPrefix + stageId; - // Link each graph to the corresponding stage page (TODO: handle stage attempts) - var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0") - .find("a") - .attr("href") + "&expandDagViz=true"; - var container = svgContainer - .append("a") - .attr("xlink:href", stageLink) - .append("g") - .attr("id", containerId); + var isSkipped = metadata.attr("skipped") == "true"; + var container; + if (isSkipped) { + container = svgContainer + .append("g") + .attr("id", containerId) + .attr("skipped", "true"); + } else { + // Link each graph to the corresponding stage page (TODO: handle stage attempts) + // Use the link from the stage table so it also works for the history server + var attemptId = 0 + var stageLink = d3.select("#stage-" + stageId + "-" + attemptId) + .select("a") + .attr("href") + "&expandDagViz=true"; + container = svgContainer + .append("a") + .attr("xlink:href", stageLink) + .append("g") + .attr("id", containerId); + } // Now we need to shift the container for this stage so it doesn't overlap with // existing ones, taking into account the position and width of the last stage's // container. We do not need to do this for the first stage of this job. if (i > 0) { - var existingStages = svgContainer - .selectAll("g.cluster") - .filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]"); + var existingStages = svgContainer.selectAll("g.cluster.stage") if (!existingStages.empty()) { var lastStage = d3.select(existingStages[0].pop()); var lastStageWidth = toFloat(lastStage.select("rect").attr("width")); @@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) { // Actually render the stage renderDot(dot, container, true); + // Mark elements as skipped if appropriate. Unfortunately we need to mark all + // elements instead of the parent container because of CSS override rules. + if (isSkipped) { + container.selectAll("g").classed("skipped", true); + } + // Round corners on rectangles container .selectAll("rect") @@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) { var renderer = new dagreD3.render(); preprocessGraphLayout(g, forJob); renderer(container, g); + + // Find the stage cluster and mark it for styling and post-processing + container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true); } /* -------------------- * diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index ad16becde8..6194c50ec8 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -352,10 +352,12 @@ private[spark] object UIUtils extends Logging { </a> </span> <div id="dag-viz-graph"></div> - <div id="dag-viz-metadata"> + <div id="dag-viz-metadata" style="display:none"> { graphs.map { g => - <div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none"> + val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "") + val skipped = g.rootCluster.name.contains("skipped").toString + <div class="stage-metadata" stage-id={stageId} skipped={skipped}> <div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div> { g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } } { g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } } diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 25d5c6ff7e..33a7303be7 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int) * This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap), * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters. */ -private[ui] class RDDOperationCluster(val id: String, val name: String) { +private[ui] class RDDOperationCluster(val id: String, private var _name: String) { private val _childNodes = new ListBuffer[RDDOperationNode] private val _childClusters = new ListBuffer[RDDOperationCluster] + def name: String = _name + def setName(n: String): Unit = { _name = n } + def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode } @@ -71,6 +74,8 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) { private[ui] object RDDOperationGraph extends Logging { + val STAGE_CLUSTER_PREFIX = "stage_" + /** * Construct a RDDOperationGraph for a given stage. * @@ -88,7 +93,8 @@ private[ui] object RDDOperationGraph extends Logging { val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID // Root cluster is the stage cluster - val stageClusterId = s"stage_${stage.stageId}" + // Use a special prefix here to differentiate this cluster from other operation clusters + val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId val stageClusterName = s"Stage ${stage.stageId}" + { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index aa9c25cb5c..89119cd357 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI * A SparkListener that constructs a DAG of RDD operations. */ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener { + + // Note: the fate of jobs and stages are tied. This means when we clean up a job, + // we always clean up all of its stages. Similarly, when we clean up a stage, we + // always clean up its job (and, transitively, other stages in the same job). private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]] + private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]] + private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int] private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph] + private[ui] val completedStageIds = new mutable.HashSet[Int] // Keep track of the order in which these are inserted so we can remove old ones private[ui] val jobIds = new mutable.ArrayBuffer[Int] @@ -40,16 +47,23 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen private val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) - /** Return the graph metadata for the given stage, or None if no such information exists. */ + /** + * Return the graph metadata for all stages in the given job. + * An empty list is returned if one or more of its stages has been cleaned up. + */ def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized { - val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty } - val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) } - // If the metadata for some stages have been removed, do not bother rendering this job - if (_stageIds.size != graphs.size) { - Seq.empty - } else { - graphs + val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty) + val graphs = jobIdToStageIds.get(jobId) + .getOrElse(Seq.empty) + .flatMap { sid => stageIdToGraph.get(sid) } + // Mark any skipped stages as such + graphs.foreach { g => + val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt + if (skippedStageIds.contains(stageId) && !g.rootCluster.name.contains("skipped")) { + g.rootCluster.setName(g.rootCluster.name + " (skipped)") + } } + graphs } /** Return the graph metadata for the given stage, or None if no such information exists. */ @@ -66,22 +80,68 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted stageInfos.foreach { stageInfo => - stageIds += stageInfo.stageId - stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) - // Remove state for old stages - if (stageIds.size >= retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) - stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } - stageIds.trimStart(toRemove) - } + val stageId = stageInfo.stageId + stageIds += stageId + stageIdToJobId(stageId) = jobId + stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) + trimStagesIfNecessary() + } + + trimJobsIfNecessary() + } + + /** Keep track of stages that have completed. */ + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { + val stageId = stageCompleted.stageInfo.stageId + if (stageIdToJobId.contains(stageId)) { + // Note: Only do this if the stage has not already been cleaned up + // Otherwise, we may never clean this stage from `completedStageIds` + completedStageIds += stageCompleted.stageInfo.stageId + } + } + + /** On job end, find all stages in this job that are skipped and mark them as such. */ + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { + val jobId = jobEnd.jobId + jobIdToStageIds.get(jobId).foreach { stageIds => + val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) } + // Note: Only do this if the job has not already been cleaned up + // Otherwise, we may never clean this job from `jobIdToSkippedStageIds` + jobIdToSkippedStageIds(jobId) = skippedStageIds } + } + + /** Clean metadata for old stages if we have exceeded the number to retain. */ + private def trimStagesIfNecessary(): Unit = { + if (stageIds.size >= retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stageIds.take(toRemove).foreach { id => cleanStage(id) } + stageIds.trimStart(toRemove) + } + } - // Remove state for old jobs + /** Clean metadata for old jobs if we have exceeded the number to retain. */ + private def trimJobsIfNecessary(): Unit = { if (jobIds.size >= retainedJobs) { val toRemove = math.max(retainedJobs / 10, 1) - jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) } + jobIds.take(toRemove).foreach { id => cleanJob(id) } jobIds.trimStart(toRemove) } } + /** Clean metadata for the given stage, its job, and all other stages that belong to the job. */ + private[ui] def cleanStage(stageId: Int): Unit = { + completedStageIds.remove(stageId) + stageIdToGraph.remove(stageId) + stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) } + } + + /** Clean metadata for the given job and all stages that belong to it. */ + private[ui] def cleanJob(jobId: Int): Unit = { + jobIdToSkippedStageIds.remove(jobId) + jobIdToStageIds.remove(jobId).foreach { stageIds => + stageIds.foreach { stageId => cleanStage(stageId) } + } + } + } |