aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala96
3 files changed, 79 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index b6bff64ee3..146cfb9ba8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -46,6 +46,15 @@ class AccumulableInfo private[spark] (
}
object AccumulableInfo {
+ def apply(
+ id: Long,
+ name: String,
+ update: Option[String],
+ value: String,
+ internal: Boolean): AccumulableInfo = {
+ new AccumulableInfo(id, name, update, value, internal)
+ }
+
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
new AccumulableInfo(id, name, update, value, internal = false)
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 40729fa5a4..a06dc6f709 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -282,7 +282,8 @@ private[spark] object JsonProtocol {
("ID" -> accumulableInfo.id) ~
("Name" -> accumulableInfo.name) ~
("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
- ("Value" -> accumulableInfo.value)
+ ("Value" -> accumulableInfo.value) ~
+ ("Internal" -> accumulableInfo.internal)
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -696,7 +697,8 @@ private[spark] object JsonProtocol {
val name = (json \ "Name").extract[String]
val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
val value = (json \ "Value").extract[String]
- AccumulableInfo(id, name, update, value)
+ val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
+ AccumulableInfo(id, name, update, value, internal)
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a24bf2931c..f9572921f4 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}
+ test("AccumulableInfo backward compatibility") {
+ // "Internal" property of AccumulableInfo were added after 1.5.1.
+ val accumulableInfo = makeAccumulableInfo(1)
+ val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+ .removeField({ _._1 == "Internal" })
+ val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
+ assert(false === oldInfo.internal)
+ }
+
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
@@ -723,15 +732,15 @@ class JsonProtocolSuite extends SparkFunSuite {
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
speculative)
val (acc1, acc2, acc3) =
- (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
+ (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
taskInfo.accumulables += acc1
taskInfo.accumulables += acc2
taskInfo.accumulables += acc3
taskInfo
}
- private def makeAccumulableInfo(id: Int): AccumulableInfo =
- AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)
+ private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
+ AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
@@ -812,13 +821,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| },
@@ -866,13 +877,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| }
@@ -902,19 +915,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
- | "Value": "val3"
+ | "Value": "val3",
+ | "Internal": true
| }
| ]
| }
@@ -942,19 +958,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
- | "Value": "val3"
+ | "Value": "val3",
+ | "Internal": true
| }
| ]
| }
@@ -988,19 +1007,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
- | "Value": "val3"
+ | "Value": "val3",
+ | "Internal": true
| }
| ]
| },
@@ -1074,19 +1096,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
- | "Value": "val3"
+ | "Value": "val3",
+ | "Internal": true
| }
| ]
| },
@@ -1157,19 +1182,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
- | "Value": "val3"
+ | "Value": "val3",
+ | "Internal": true
| }
| ]
| },
@@ -1251,13 +1279,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| },
@@ -1309,13 +1339,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| },
@@ -1384,13 +1416,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| },
@@ -1476,13 +1510,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
- | "Value": "val2"
+ | "Value": "val2",
+ | "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
- | "Value": "val1"
+ | "Value": "val1",
+ | "Internal": false
| }
| ]
| }