aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala5
5 files changed, 13 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 8f0c5e7841..202fba699a 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -117,8 +117,8 @@ case object TaskKilled extends TaskFailedReason {
* the task crashed the JVM.
*/
@DeveloperApi
-case object ExecutorLostFailure extends TaskFailedReason {
- override def toErrorString: String = "ExecutorLostFailure (executor lost)"
+case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
+ override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a976734007..d8fb640350 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -732,7 +732,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
+ handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 5b2e7d3a7e..43c7fba066 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -272,7 +272,7 @@ private[spark] object JsonProtocol {
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
- val json = taskEndReason match {
+ val json: JObject = taskEndReason match {
case fetchFailed: FetchFailed =>
val blockManagerAddress = Option(fetchFailed.bmAddress).
map(blockManagerIdToJson).getOrElse(JNothing)
@@ -287,6 +287,8 @@ private[spark] object JsonProtocol {
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Metrics" -> metrics)
+ case ExecutorLostFailure(executorId) =>
+ ("Executor ID" -> executorId)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -636,7 +638,9 @@ private[spark] object JsonProtocol {
new ExceptionFailure(className, description, stackTrace, metrics)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
- case `executorLostFailure` => ExecutorLostFailure
+ case `executorLostFailure` =>
+ val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
+ ExecutorLostFailure(executorId.getOrElse("Unknown"))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 3370dd4156..6567c5ab83 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -119,7 +119,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
- ExecutorLostFailure,
+ ExecutorLostFailure("0"),
UnknownReason)
var failCount = 0
for (reason <- taskFailedReasons) {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f1f88c5fd3..d235d7a0ed 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -115,7 +115,7 @@ class JsonProtocolSuite extends FunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
- testTaskEndReason(ExecutorLostFailure)
+ testTaskEndReason(ExecutorLostFailure("100"))
testTaskEndReason(UnknownReason)
// BlockId
@@ -403,7 +403,8 @@ class JsonProtocolSuite extends FunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
- case (ExecutorLostFailure, ExecutorLostFailure) =>
+ case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+ assert(execId1 === execId2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}