aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-12-03 11:09:29 -0800
committerAndrew Or <andrew@databricks.com>2015-12-03 11:09:29 -0800
commit688e521c2833a00069272a6749153d721a0996f6 (patch)
tree8dca718b9f02b07ad18297cb4b9570579f939857 /core
parent649be4fa4532dcd3001df8345f9f7e970a3fbc65 (diff)
downloadspark-688e521c2833a00069272a6749153d721a0996f6.tar.gz
spark-688e521c2833a00069272a6749153d721a0996f6.tar.bz2
spark-688e521c2833a00069272a6749153d721a0996f6.zip
[SPARK-12108] Make event logs smaller
**Problem.** Event logs in 1.6 were much bigger than 1.5. I ran page rank and the event log size in 1.6 was almost 5x that in 1.5. I did a bisect to find that the RDD callsite added in #9398 is largely responsible for this. **Solution.** This patch removes the long form of the callsite (which is not used!) from the event log. This reduces the size of the event log significantly. *Note on compatibility*: if this patch is to be merged into 1.6.0, then it won't break any compatibility. Otherwise, if it is merged into 1.6.1, then we might need to add more backward compatibility handling logic (currently does not exist yet). Author: Andrew Or <andrew@databricks.com> Closes #10115 from andrewor14/smaller-event-logs.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala35
4 files changed, 20 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 87c1b981e7..94e8559bd2 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -28,7 +28,7 @@ class RDDInfo(
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
- val callSite: CallSite = CallSite.empty,
+ val callSite: String = "",
val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {
@@ -58,6 +58,6 @@ private[spark] object RDDInfo {
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
val parentIds = rdd.dependencies.map(_.rdd.id)
new RDDInfo(rdd.id, rddName, rdd.partitions.length,
- rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
+ rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 2427456265..e9c8a8e299 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -39,7 +39,7 @@ private[ui] case class RDDOperationGraph(
rootCluster: RDDOperationCluster)
/** A node in an RDDOperationGraph. This represents an RDD. */
-private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite)
+private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: String)
/**
* A directed edge connecting two nodes in an RDDOperationGraph.
@@ -178,7 +178,7 @@ private[ui] object RDDOperationGraph extends Logging {
/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
- val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}"
+ val label = s"${node.name} [${node.id}]\n${node.callsite}"
s"""${node.id} [label="$label"]"""
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c9beeb25e0..2d2bd90eb3 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -398,7 +398,7 @@ private[spark] object JsonProtocol {
("RDD ID" -> rddInfo.id) ~
("Name" -> rddInfo.name) ~
("Scope" -> rddInfo.scope.map(_.toJson)) ~
- ("Callsite" -> callsiteToJson(rddInfo.callSite)) ~
+ ("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
@@ -408,11 +408,6 @@ private[spark] object JsonProtocol {
("Disk Size" -> rddInfo.diskSize)
}
- def callsiteToJson(callsite: CallSite): JValue = {
- ("Short Form" -> callsite.shortForm) ~
- ("Long Form" -> callsite.longForm)
- }
-
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
@@ -857,9 +852,7 @@ private[spark] object JsonProtocol {
val scope = Utils.jsonOption(json \ "Scope")
.map(_.extract[String])
.map(RDDOperationScope.fromJson)
- val callsite = Utils.jsonOption(json \ "Callsite")
- .map(callsiteFromJson)
- .getOrElse(CallSite.empty)
+ val callsite = Utils.jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
@@ -880,12 +873,6 @@ private[spark] object JsonProtocol {
rddInfo
}
- def callsiteFromJson(json: JValue): CallSite = {
- val shortForm = (json \ "Short Form").extract[String]
- val longForm = (json \ "Long Form").extract[String]
- CallSite(shortForm, longForm)
- }
-
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 3f94ef7041..1939ce5c74 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -111,7 +111,6 @@ class JsonProtocolSuite extends SparkFunSuite {
test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
- testCallsite(CallSite("happy", "birthday"))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
@@ -343,13 +342,13 @@ class JsonProtocolSuite extends SparkFunSuite {
// "Scope" and "Parent IDs" were introduced in Spark 1.4.0
// "Callsite" was introduced in Spark 1.6.0
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
- CallSite("short", "long"), Some(new RDDOperationScope("fable")))
+ "callsite", Some(new RDDOperationScope("fable")))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Parent IDs"})
.removeField({ _._1 == "Scope"})
.removeField({ _._1 == "Callsite"})
val expectedRddInfo = new RDDInfo(
- 1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None)
+ 1, "one", 100, StorageLevel.NONE, Seq.empty, "", scope = None)
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
}
@@ -397,11 +396,6 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(info, newInfo)
}
- private def testCallsite(callsite: CallSite): Unit = {
- val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite))
- assert(callsite === newCallsite)
- }
-
private def testStageInfo(info: StageInfo) {
val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
assertEquals(info, newInfo)
@@ -726,8 +720,7 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
- val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK,
- Seq(1, 4, 7), CallSite(a.toString, b.toString))
+ val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7), a.toString)
r.numCachedPartitions = c
r.memSize = d
r.diskSize = e
@@ -870,7 +863,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 101,
| "Name": "mayor",
- | "Callsite": {"Short Form": "101", "Long Form": "201"},
+ | "Callsite": "101",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1273,7 +1266,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 1,
| "Name": "mayor",
- | "Callsite": {"Short Form": "1", "Long Form": "200"},
+ | "Callsite": "1",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1317,7 +1310,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 2,
| "Name": "mayor",
- | "Callsite": {"Short Form": "2", "Long Form": "400"},
+ | "Callsite": "2",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1335,7 +1328,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
- | "Callsite": {"Short Form": "3", "Long Form": "401"},
+ | "Callsite": "3",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1379,7 +1372,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
- | "Callsite": {"Short Form": "3", "Long Form": "600"},
+ | "Callsite": "3",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1397,7 +1390,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
- | "Callsite": {"Short Form": "4", "Long Form": "601"},
+ | "Callsite": "4",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1415,7 +1408,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
- | "Callsite": {"Short Form": "5", "Long Form": "602"},
+ | "Callsite": "5",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1459,7 +1452,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
- | "Callsite": {"Short Form": "4", "Long Form": "800"},
+ | "Callsite": "4",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1477,7 +1470,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
- | "Callsite": {"Short Form": "5", "Long Form": "801"},
+ | "Callsite": "5",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1495,7 +1488,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 6,
| "Name": "mayor",
- | "Callsite": {"Short Form": "6", "Long Form": "802"},
+ | "Callsite": "6",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
@@ -1513,7 +1506,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 7,
| "Name": "mayor",
- | "Callsite": {"Short Form": "7", "Long Form": "803"},
+ | "Callsite": "7",
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,