aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-22 16:35:43 -0700
committerAndrew Or <andrew@databricks.com>2015-09-22 16:35:43 -0700
commit61d4c07f4becb42f054e588be56ed13239644410 (patch)
tree15bcc3344a554f2ebb82581a36e7c02e2b44b52e /core
parenta96ba40f7ee1352288ea676d8844e1c8174202eb (diff)
downloadspark-61d4c07f4becb42f054e588be56ed13239644410.tar.gz
spark-61d4c07f4becb42f054e588be56ed13239644410.tar.bz2
spark-61d4c07f4becb42f054e588be56ed13239644410.zip
[SPARK-10640] History server fails to parse TaskCommitDenied
... simply because the code is missing! Author: Andrew Or <andrew@databricks.com> Closes #8828 from andrewor14/task-end-reason-json.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala17
3 files changed, 35 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7137246bc3..9335c5f416 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,13 +17,17 @@
package org.apache.spark
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
+// ==============================================================================================
+// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
+// ==============================================================================================
+
/**
* :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 99614a786b..40729fa5a4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,6 +362,10 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
+ case taskCommitDenied: TaskCommitDenied =>
+ ("Job ID" -> taskCommitDenied.jobID) ~
+ ("Partition ID" -> taskCommitDenied.partitionID) ~
+ ("Attempt Number" -> taskCommitDenied.attemptNumber)
case ExecutorLostFailure(executorId, isNormalExit) =>
("Executor ID" -> executorId) ~
("Normal Exit" -> isNormalExit)
@@ -770,6 +774,7 @@ private[spark] object JsonProtocol {
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
+ val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)
@@ -794,6 +799,14 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
+ case `taskCommitDenied` =>
+ // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
+ // de/serialization logic was not added until 1.5.1. To provide backward compatibility
+ // for reading those logs, we need to provide default values for all the fields.
+ val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
+ val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
+ val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
+ TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
map(_.extract[Boolean])
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 143c1b901d..a24bf2931c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,6 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
+ testTaskEndReason(TaskCommitDenied(2, 3, 4))
testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)
@@ -352,6 +353,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
}
+ // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
+ test("TaskCommitDenied backward compatibility") {
+ val denied = TaskCommitDenied(1, 2, 3)
+ val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
+ .removeField({ _._1 == "Job ID" })
+ .removeField({ _._1 == "Partition ID" })
+ .removeField({ _._1 == "Attempt Number" })
+ val expectedDenied = TaskCommitDenied(-1, -1, -1)
+ assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
+ }
+
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
@@ -577,6 +589,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
+ case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
+ TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
+ assert(jobId1 === jobId2)
+ assert(partitionId1 === partitionId2)
+ assert(attemptNumber1 === attemptNumber2)
case (ExecutorLostFailure(execId1, isNormalExit1),
ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)