diff options
author | Carson Wang <carson.wang@intel.com> | 2015-10-15 10:36:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-15 10:36:54 -0700 |
commit | d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c (patch) | |
tree | f8d3a36004b430aa89f35981357800ddab6e423d /core | |
parent | 523adc24a683930304f408d477607edfe9de7b76 (diff) | |
download | spark-d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c.tar.gz spark-d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c.tar.bz2 spark-d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c.zip |
[SPARK-11047] Internal accumulators miss the internal flag when replaying events in the history server
Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.
Author: Carson Wang <carson.wang@intel.com>
Closes #9061 from carsonwang/accumulableBug.
Diffstat (limited to 'core')
3 files changed, 79 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index b6bff64ee3..146cfb9ba8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -46,6 +46,15 @@ class AccumulableInfo private[spark] ( } object AccumulableInfo { + def apply( + id: Long, + name: String, + update: Option[String], + value: String, + internal: Boolean): AccumulableInfo = { + new AccumulableInfo(id, name, update, value, internal) + } + def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = { new AccumulableInfo(id, name, update, value, internal = false) } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 40729fa5a4..a06dc6f709 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -282,7 +282,8 @@ private[spark] object JsonProtocol { ("ID" -> accumulableInfo.id) ~ ("Name" -> accumulableInfo.name) ~ ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~ - ("Value" -> accumulableInfo.value) + ("Value" -> accumulableInfo.value) ~ + ("Internal" -> accumulableInfo.internal) } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -696,7 +697,8 @@ private[spark] object JsonProtocol { val name = (json \ "Name").extract[String] val update = Utils.jsonOption(json \ "Update").map(_.extract[String]) val value = (json \ "Value").extract[String] - AccumulableInfo(id, name, update, value) + val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) + AccumulableInfo(id, name, update, value, internal) } def taskMetricsFromJson(json: JValue): TaskMetrics = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a24bf2931c..f9572921f4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) } + test("AccumulableInfo backward compatibility") { + // "Internal" property of AccumulableInfo were added after 1.5.1. + val accumulableInfo = makeAccumulableInfo(1) + val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo) + .removeField({ _._1 == "Internal" }) + val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson) + assert(false === oldInfo.internal) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -723,15 +732,15 @@ class JsonProtocolSuite extends SparkFunSuite { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) val (acc1, acc2, acc3) = - (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3)) + (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo.accumulables += acc1 taskInfo.accumulables += acc2 taskInfo.accumulables += acc3 taskInfo } - private def makeAccumulableInfo(id: Int): AccumulableInfo = - AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id) + private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo = + AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal) /** * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is @@ -812,13 +821,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -866,13 +877,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } @@ -902,19 +915,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -942,19 +958,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -988,19 +1007,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1074,19 +1096,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1157,19 +1182,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1251,13 +1279,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1309,13 +1339,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1384,13 +1416,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1476,13 +1510,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } |