aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-11-07 05:35:53 +0100
committerAndrew Or <andrew@databricks.com>2015-11-07 05:35:53 +0100
commit7f741905b06ed6d3dfbff6db41a3355dab71aa3c (patch)
tree73937a738241b065d82c2593b2063d6166869b32 /core/src
parent30b706b7b36482921ec04145a0121ca147984fa8 (diff)
downloadspark-7f741905b06ed6d3dfbff6db41a3355dab71aa3c.tar.gz
spark-7f741905b06ed6d3dfbff6db41a3355dab71aa3c.tar.bz2
spark-7f741905b06ed6d3dfbff6db41a3355dab71aa3c.zip
[SPARK-11112] DAG visualization: display RDD callsite
<img width="548" alt="screen shot 2015-11-01 at 9 42 33 am" src="https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png"> mateiz sarutak Author: Andrew Or <andrew@databricks.com> Closes #9398 from andrewor14/rdd-callsite.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala37
7 files changed, 79 insertions, 20 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
index 3b4ae2ed35..9cc5c79f67 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
@@ -122,3 +122,7 @@
stroke: #52C366;
stroke-width: 2px;
}
+
+.tooltip-inner {
+ white-space: pre-wrap;
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 96062626b5..3fa209b924 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDDOperationScope, RDD}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallSite, Utils}
@DeveloperApi
class RDDInfo(
@@ -28,9 +28,20 @@ class RDDInfo(
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
+ val callSite: CallSite,
val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {
+ def this(
+ id: Int,
+ name: String,
+ numPartitions: Int,
+ storageLevel: StorageLevel,
+ parentIds: Seq[Int],
+ scope: Option[RDDOperationScope] = None) {
+ this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope)
+ }
+
var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
@@ -56,6 +67,7 @@ private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
val parentIds = rdd.dependencies.map(_.rdd.id)
- new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope)
+ new RDDInfo(rdd.id, rddName, rdd.partitions.length,
+ rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 81f168a447..2427456265 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.CallSite
/**
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph(
rootCluster: RDDOperationCluster)
/** A node in an RDDOperationGraph. This represents an RDD. */
-private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
+private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite)
/**
* A directed edge connecting two nodes in an RDDOperationGraph.
@@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging {
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
- val node = nodes.getOrElseUpdate(
- rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))
+ val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
+ rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
@@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging {
/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
- s"""${node.id} [label="${node.name} [${node.id}]"]"""
+ val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}"
+ s"""${node.id} [label="$label"]"""
}
/** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee2eb58cf5..c9beeb25e0 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -398,6 +398,7 @@ private[spark] object JsonProtocol {
("RDD ID" -> rddInfo.id) ~
("Name" -> rddInfo.name) ~
("Scope" -> rddInfo.scope.map(_.toJson)) ~
+ ("Callsite" -> callsiteToJson(rddInfo.callSite)) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
@@ -407,6 +408,11 @@ private[spark] object JsonProtocol {
("Disk Size" -> rddInfo.diskSize)
}
+ def callsiteToJson(callsite: CallSite): JValue = {
+ ("Short Form" -> callsite.shortForm) ~
+ ("Long Form" -> callsite.longForm)
+ }
+
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
@@ -851,6 +857,9 @@ private[spark] object JsonProtocol {
val scope = Utils.jsonOption(json \ "Scope")
.map(_.extract[String])
.map(RDDOperationScope.fromJson)
+ val callsite = Utils.jsonOption(json \ "Callsite")
+ .map(callsiteFromJson)
+ .getOrElse(CallSite.empty)
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
@@ -863,7 +872,7 @@ private[spark] object JsonProtocol {
.getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope)
+ val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.externalBlockStoreSize = externalBlockStoreSize
@@ -871,6 +880,12 @@ private[spark] object JsonProtocol {
rddInfo
}
+ def callsiteFromJson(json: JValue): CallSite = {
+ val shortForm = (json \ "Short Form").extract[String]
+ val longForm = (json \ "Long Form").extract[String]
+ CallSite(shortForm, longForm)
+ }
+
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5a976ee839..316c194ff3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, longForm: String)
private[spark] object CallSite {
val SHORT_FORM = "callSite.short"
val LONG_FORM = "callSite.long"
+ val empty = CallSite("", "")
}
/**
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 18eec7da97..ceecfd665b 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
"label=&quot;Stage 0&quot;;\n subgraph "))
assert(stage0.contains("{\n label=&quot;parallelize&quot;;\n " +
- "0 [label=&quot;ParallelCollectionRDD [0]&quot;];\n }"))
+ "0 [label=&quot;ParallelCollectionRDD [0]"))
assert(stage0.contains("{\n label=&quot;map&quot;;\n " +
- "1 [label=&quot;MapPartitionsRDD [1]&quot;];\n }"))
+ "1 [label=&quot;MapPartitionsRDD [1]"))
assert(stage0.contains("{\n label=&quot;groupBy&quot;;\n " +
- "2 [label=&quot;MapPartitionsRDD [2]&quot;];\n }"))
+ "2 [label=&quot;MapPartitionsRDD [2]"))
val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
"/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
"label=&quot;Stage 1&quot;;\n subgraph "))
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
- "3 [label=&quot;ShuffledRDD [3]&quot;];\n }"))
+ "3 [label=&quot;ShuffledRDD [3]"))
assert(stage1.contains("{\n label=&quot;map&quot;;\n " +
- "4 [label=&quot;MapPartitionsRDD [4]&quot;];\n }"))
+ "4 [label=&quot;MapPartitionsRDD [4]"))
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
- "5 [label=&quot;MapPartitionsRDD [5]&quot;];\n }"))
+ "5 [label=&quot;MapPartitionsRDD [5]"))
val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
"/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
"label=&quot;Stage 2&quot;;\n subgraph "))
assert(stage2.contains("{\n label=&quot;groupBy&quot;;\n " +
- "6 [label=&quot;ShuffledRDD [6]&quot;];\n }"))
+ "6 [label=&quot;ShuffledRDD [6]"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 953456c2ca..3f94ef7041 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite {
test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
+ testCallsite(CallSite("happy", "birthday"))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
@@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite {
testBlockId(StreamBlockId(1, 2L))
}
+ /* ============================== *
+ | Backward compatibility tests |
+ * ============================== */
+
test("ExceptionFailure backward compatibility") {
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
None, None)
@@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}
- test("RDDInfo backward compatibility (scope, parent IDs)") {
- // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
- val rddInfo = new RDDInfo(
- 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable")))
+ test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
+ // "Scope" and "Parent IDs" were introduced in Spark 1.4.0
+ // "Callsite" was introduced in Spark 1.6.0
+ val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
+ CallSite("short", "long"), Some(new RDDOperationScope("fable")))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Parent IDs"})
.removeField({ _._1 == "Scope"})
- val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None)
+ .removeField({ _._1 == "Callsite"})
+ val expectedRddInfo = new RDDInfo(
+ 1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None)
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
}
@@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(info, newInfo)
}
+ private def testCallsite(callsite: CallSite): Unit = {
+ val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite))
+ assert(callsite === newCallsite)
+ }
+
private def testStageInfo(info: StageInfo) {
val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
assertEquals(info, newInfo)
@@ -713,7 +726,8 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
- val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7))
+ val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK,
+ Seq(1, 4, 7), CallSite(a.toString, b.toString))
r.numCachedPartitions = c
r.memSize = d
r.diskSize = e
@@ -856,6 +870,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 101,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "101", "Long Form": "201"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1258,6 +1273,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 1,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "1", "Long Form": "200"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1301,6 +1317,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 2,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "2", "Long Form": "400"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1318,6 +1335,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "3", "Long Form": "401"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1361,6 +1379,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "3", "Long Form": "600"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1378,6 +1397,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "4", "Long Form": "601"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1395,6 +1415,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "5", "Long Form": "602"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1438,6 +1459,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "4", "Long Form": "800"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1455,6 +1477,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "5", "Long Form": "801"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1472,6 +1495,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 6,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "6", "Long Form": "802"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1489,6 +1513,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 7,
| "Name": "mayor",
+ | "Callsite": {"Short Form": "7", "Long Form": "803"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,