diff options
Diffstat (limited to 'core')
3 files changed, 79 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index b6bff64ee3..146cfb9ba8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -46,6 +46,15 @@ class AccumulableInfo private[spark] ( } object AccumulableInfo { + def apply( + id: Long, + name: String, + update: Option[String], + value: String, + internal: Boolean): AccumulableInfo = { + new AccumulableInfo(id, name, update, value, internal) + } + def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = { new AccumulableInfo(id, name, update, value, internal = false) } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 40729fa5a4..a06dc6f709 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -282,7 +282,8 @@ private[spark] object JsonProtocol { ("ID" -> accumulableInfo.id) ~ ("Name" -> accumulableInfo.name) ~ ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~ - ("Value" -> accumulableInfo.value) + ("Value" -> accumulableInfo.value) ~ + ("Internal" -> accumulableInfo.internal) } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -696,7 +697,8 @@ private[spark] object JsonProtocol { val name = (json \ "Name").extract[String] val update = Utils.jsonOption(json \ "Update").map(_.extract[String]) val value = (json \ "Value").extract[String] - AccumulableInfo(id, name, update, value) + val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) + AccumulableInfo(id, name, update, value, internal) } def taskMetricsFromJson(json: JValue): TaskMetrics = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a24bf2931c..f9572921f4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) } + test("AccumulableInfo backward compatibility") { + // "Internal" property of AccumulableInfo were added after 1.5.1. + val accumulableInfo = makeAccumulableInfo(1) + val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo) + .removeField({ _._1 == "Internal" }) + val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson) + assert(false === oldInfo.internal) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -723,15 +732,15 @@ class JsonProtocolSuite extends SparkFunSuite { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) val (acc1, acc2, acc3) = - (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3)) + (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo.accumulables += acc1 taskInfo.accumulables += acc2 taskInfo.accumulables += acc3 taskInfo } - private def makeAccumulableInfo(id: Int): AccumulableInfo = - AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id) + private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo = + AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal) /** * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is @@ -812,13 +821,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -866,13 +877,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } @@ -902,19 +915,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -942,19 +958,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -988,19 +1007,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1074,19 +1096,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1157,19 +1182,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1251,13 +1279,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1309,13 +1339,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1384,13 +1416,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1476,13 +1510,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } |