aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-18 10:59:35 -0700
committerAndrew Or <andrew@databricks.com>2015-05-18 10:59:35 -0700
commit563bfcc1ab1b1c79b1845230c8c600db85a08fe3 (patch)
treeb5edd897a8034afb0666c4913e26637b472e255a /core/src/main
parent814b3dabdf01abc7a2f25aa32284caccadeb7798 (diff)
downloadspark-563bfcc1ab1b1c79b1845230c8c600db85a08fe3.tar.gz
spark-563bfcc1ab1b1c79b1845230c8c600db85a08fe3.tar.bz2
spark-563bfcc1ab1b1c79b1845230c8c600db85a08fe3.zip
[SPARK-7627] [SPARK-7472] DAG visualization: style skipped stages
This patch fixes two things: **SPARK-7627.** Cached RDDs no longer light up on the job page. This is a simple fix. **SPARK-7472.** Display skipped stages differently from normal stages. The latter is a major UX issue. Because we link the job viz to the stage viz even for skipped stages, the user may inadvertently click into the stage page of a skipped stage, which is empty. ------------------- <img src="https://cloud.githubusercontent.com/assets/2133137/7675241/de1a3da6-fcea-11e4-8101-88055cef78c5.png" width="300px" /> Author: Andrew Or <andrew@databricks.com> Closes #6171 from andrewor14/dag-viz-skipped and squashes the following commits: f261797 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped 0eda358 [Andrew Or] Tweak skipped stage border color c604150 [Andrew Or] Tweak grayscale colors 7010676 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped 762b541 [Andrew Or] Use special prefix for stage clusters to avoid collisions 51c95b9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped b928cd4 [Andrew Or] Fix potential leak + write tests for it 7c4c364 [Andrew Or] Show skipped stages differently 7cc34ce [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped c121fa2 [Andrew Or] Fix cache color
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css71
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js50
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala96
5 files changed, 166 insertions, 67 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
index eedefb44b9..3b4ae2ed35 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
@@ -15,32 +15,21 @@
* limitations under the License.
*/
-#dag-viz-graph svg path {
- stroke: #444;
- stroke-width: 1.5px;
-}
-
-#dag-viz-graph svg g.cluster rect {
- stroke-width: 1px;
-}
-
-#dag-viz-graph svg g.node circle {
- fill: #444;
+#dag-viz-graph a, #dag-viz-graph a:hover {
+ text-decoration: none;
}
-#dag-viz-graph svg g.node rect {
- fill: #C3EBFF;
- stroke: #3EC0FF;
- stroke-width: 1px;
+#dag-viz-graph .label {
+ font-weight: normal;
+ text-shadow: none;
}
-#dag-viz-graph svg g.node.cached circle {
- fill: #444;
+#dag-viz-graph svg path {
+ stroke: #444;
+ stroke-width: 1.5px;
}
-#dag-viz-graph svg g.node.cached rect {
- fill: #B3F5C5;
- stroke: #56F578;
+#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}
@@ -61,12 +50,23 @@
stroke-width: 1px;
}
-#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
+#dag-viz-graph svg.job g.cluster.skipped rect {
+ fill: #D6D6D6;
+ stroke: #B7B7B7;
+ stroke-width: 1px;
+}
+
+#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}
+#dag-viz-graph svg.job g.cluster.stage.skipped rect {
+ stroke: #ADADAD;
+ stroke-width: 1px;
+}
+
#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
@@ -75,6 +75,20 @@
fill: #333;
}
+#dag-viz-graph svg.job g.cluster.skipped text {
+ fill: #666;
+}
+
+#dag-viz-graph svg.job g.node circle {
+ fill: #444;
+}
+
+#dag-viz-graph svg.job g.node.cached circle {
+ fill: #A3F545;
+ stroke: #52C366;
+ stroke-width: 2px;
+}
+
/* Stage page specific styles */
#dag-viz-graph svg.stage g.cluster rect {
@@ -83,7 +97,7 @@
stroke-width: 1px;
}
-#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
+#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
@@ -97,11 +111,14 @@
fill: #333;
}
-#dag-viz-graph a, #dag-viz-graph a:hover {
- text-decoration: none;
+#dag-viz-graph svg.stage g.node rect {
+ fill: #C3EBFF;
+ stroke: #3EC0FF;
+ stroke-width: 1px;
}
-#dag-viz-graph .label {
- font-weight: normal;
- text-shadow: none;
+#dag-viz-graph svg.stage g.node.cached rect {
+ fill: #B3F5C5;
+ stroke: #52C366;
+ stroke-width: 2px;
}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index ee48fd29a6..aaeba5b102 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -57,9 +57,7 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
- stagePrefix: "stage_",
- clusterPrefix: "cluster_",
- stageClusterPrefix: "cluster_stage_"
+ clusterPrefix: "cluster_"
};
var JobPageVizConstants = {
@@ -133,9 +131,7 @@ function renderDagViz(forJob) {
}
// Render
- var svg = graphContainer()
- .append("svg")
- .attr("class", jobOrStage);
+ var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
@@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
- // Link each graph to the corresponding stage page (TODO: handle stage attempts)
- var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
- .find("a")
- .attr("href") + "&expandDagViz=true";
- var container = svgContainer
- .append("a")
- .attr("xlink:href", stageLink)
- .append("g")
- .attr("id", containerId);
+ var isSkipped = metadata.attr("skipped") == "true";
+ var container;
+ if (isSkipped) {
+ container = svgContainer
+ .append("g")
+ .attr("id", containerId)
+ .attr("skipped", "true");
+ } else {
+ // Link each graph to the corresponding stage page (TODO: handle stage attempts)
+ // Use the link from the stage table so it also works for the history server
+ var attemptId = 0
+ var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
+ .select("a")
+ .attr("href") + "&expandDagViz=true";
+ container = svgContainer
+ .append("a")
+ .attr("xlink:href", stageLink)
+ .append("g")
+ .attr("id", containerId);
+ }
// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
- var existingStages = svgContainer
- .selectAll("g.cluster")
- .filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
+ var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);
+ // Mark elements as skipped if appropriate. Unfortunately we need to mark all
+ // elements instead of the parent container because of CSS override rules.
+ if (isSkipped) {
+ container.selectAll("g").classed("skipped", true);
+ }
+
// Round corners on rectangles
container
.selectAll("rect")
@@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);
+
+ // Find the stage cluster and mark it for styling and post-processing
+ container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}
/* -------------------- *
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index ad16becde8..6194c50ec8 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -352,10 +352,12 @@ private[spark] object UIUtils extends Logging {
</a>
</span>
<div id="dag-viz-graph"></div>
- <div id="dag-viz-metadata">
+ <div id="dag-viz-metadata" style="display:none">
{
graphs.map { g =>
- <div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
+ val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "")
+ val skipped = g.rootCluster.name.contains("skipped").toString
+ <div class="stage-metadata" stage-id={stageId} skipped={skipped}>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 25d5c6ff7e..33a7303be7 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
-private[ui] class RDDOperationCluster(val id: String, val name: String) {
+private[ui] class RDDOperationCluster(val id: String, private var _name: String) {
private val _childNodes = new ListBuffer[RDDOperationNode]
private val _childClusters = new ListBuffer[RDDOperationCluster]
+ def name: String = _name
+ def setName(n: String): Unit = { _name = n }
+
def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
@@ -71,6 +74,8 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {
private[ui] object RDDOperationGraph extends Logging {
+ val STAGE_CLUSTER_PREFIX = "stage_"
+
/**
* Construct a RDDOperationGraph for a given stage.
*
@@ -88,7 +93,8 @@ private[ui] object RDDOperationGraph extends Logging {
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
// Root cluster is the stage cluster
- val stageClusterId = s"stage_${stage.stageId}"
+ // Use a special prefix here to differentiate this cluster from other operation clusters
+ val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index aa9c25cb5c..89119cd357 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
+
+ // Note: the fate of jobs and stages are tied. This means when we clean up a job,
+ // we always clean up all of its stages. Similarly, when we clean up a stage, we
+ // always clean up its job (and, transitively, other stages in the same job).
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
+ private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]]
+ private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int]
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
+ private[ui] val completedStageIds = new mutable.HashSet[Int]
// Keep track of the order in which these are inserted so we can remove old ones
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
@@ -40,16 +47,23 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
- /** Return the graph metadata for the given stage, or None if no such information exists. */
+ /**
+ * Return the graph metadata for all stages in the given job.
+ * An empty list is returned if one or more of its stages has been cleaned up.
+ */
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
- val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
- val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
- // If the metadata for some stages have been removed, do not bother rendering this job
- if (_stageIds.size != graphs.size) {
- Seq.empty
- } else {
- graphs
+ val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
+ val graphs = jobIdToStageIds.get(jobId)
+ .getOrElse(Seq.empty)
+ .flatMap { sid => stageIdToGraph.get(sid) }
+ // Mark any skipped stages as such
+ graphs.foreach { g =>
+ val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
+ if (skippedStageIds.contains(stageId) && !g.rootCluster.name.contains("skipped")) {
+ g.rootCluster.setName(g.rootCluster.name + " (skipped)")
+ }
}
+ graphs
}
/** Return the graph metadata for the given stage, or None if no such information exists. */
@@ -66,22 +80,68 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
stageInfos.foreach { stageInfo =>
- stageIds += stageInfo.stageId
- stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
- // Remove state for old stages
- if (stageIds.size >= retainedStages) {
- val toRemove = math.max(retainedStages / 10, 1)
- stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
- stageIds.trimStart(toRemove)
- }
+ val stageId = stageInfo.stageId
+ stageIds += stageId
+ stageIdToJobId(stageId) = jobId
+ stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ trimStagesIfNecessary()
+ }
+
+ trimJobsIfNecessary()
+ }
+
+ /** Keep track of stages that have completed. */
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
+ val stageId = stageCompleted.stageInfo.stageId
+ if (stageIdToJobId.contains(stageId)) {
+ // Note: Only do this if the stage has not already been cleaned up
+ // Otherwise, we may never clean this stage from `completedStageIds`
+ completedStageIds += stageCompleted.stageInfo.stageId
+ }
+ }
+
+ /** On job end, find all stages in this job that are skipped and mark them as such. */
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
+ val jobId = jobEnd.jobId
+ jobIdToStageIds.get(jobId).foreach { stageIds =>
+ val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) }
+ // Note: Only do this if the job has not already been cleaned up
+ // Otherwise, we may never clean this job from `jobIdToSkippedStageIds`
+ jobIdToSkippedStageIds(jobId) = skippedStageIds
}
+ }
+
+ /** Clean metadata for old stages if we have exceeded the number to retain. */
+ private def trimStagesIfNecessary(): Unit = {
+ if (stageIds.size >= retainedStages) {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stageIds.take(toRemove).foreach { id => cleanStage(id) }
+ stageIds.trimStart(toRemove)
+ }
+ }
- // Remove state for old jobs
+ /** Clean metadata for old jobs if we have exceeded the number to retain. */
+ private def trimJobsIfNecessary(): Unit = {
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
- jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
+ jobIds.take(toRemove).foreach { id => cleanJob(id) }
jobIds.trimStart(toRemove)
}
}
+ /** Clean metadata for the given stage, its job, and all other stages that belong to the job. */
+ private[ui] def cleanStage(stageId: Int): Unit = {
+ completedStageIds.remove(stageId)
+ stageIdToGraph.remove(stageId)
+ stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) }
+ }
+
+ /** Clean metadata for the given job and all stages that belong to it. */
+ private[ui] def cleanJob(jobId: Int): Unit = {
+ jobIdToSkippedStageIds.remove(jobId)
+ jobIdToStageIds.remove(jobId).foreach { stageIds =>
+ stageIds.foreach { stageId => cleanStage(stageId) }
+ }
+ }
+
}