aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-25 22:35:03 -0700
committerReynold Xin <rxin@apache.org>2014-06-25 22:35:17 -0700
commitc445b3af3b6c524c6113dd036df0ed2f909d184c (patch)
tree92887d2237e4d7af2f88c3645f40691319dccb75
parentfa167194ce1b5898e4d7232346c9f86b2897a722 (diff)
downloadspark-c445b3af3b6c524c6113dd036df0ed2f909d184c.tar.gz
spark-c445b3af3b6c524c6113dd036df0ed2f909d184c.tar.bz2
spark-c445b3af3b6c524c6113dd036df0ed2f909d184c.zip
[SPARK-2284][UI] Mark all failed tasks as failures.
Previously only tasks failed with ExceptionFailure reason was marked as failure. Author: Reynold Xin <rxin@apache.org> Closes #1224 from rxin/SPARK-2284 and squashes the following commits: be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures. (cherry picked from commit 4a346e242c3f241c575f35536220df01ad724e23) Signed-off-by: Reynold Xin <rxin@apache.org>
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala30
2 files changed, 35 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 396cbcbc8d..bfefe4dbc4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
+ case org.apache.spark.Success =>
+ stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure =>
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics)
- case _ =>
- stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
- (None, Option(taskEnd.taskMetrics))
+ case e: org.apache.spark.TaskEndReason =>
+ stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+ (None, None)
}
stageIdToTime.getOrElseUpdate(sid, 0L)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8c06a2d9aa..86a5cda7c8 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.{LocalSparkContext, SparkConf, Success}
+import org.apache.spark._
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Shou
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000)
}
+
+ test("test task success vs failure counting for different task end reasons") {
+ val conf = new SparkConf()
+ val listener = new JobProgressListener(conf)
+ val metrics = new TaskMetrics()
+ val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ val task = new ShuffleMapTask(0, null, null, 0, null)
+ val taskType = Utils.getFormattedClassName(task)
+
+ // Go through all the failure cases to make sure we are counting them as failures.
+ val taskFailedReasons = Seq(
+ Resubmitted,
+ new FetchFailed(null, 0, 0, 0),
+ new ExceptionFailure("Exception", "description", null, None),
+ TaskResultLost,
+ TaskKilled,
+ ExecutorLostFailure,
+ UnknownReason)
+ for (reason <- taskFailedReasons) {
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
+ assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ }
+
+ // Make sure we count success as success.
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
+ assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ }
}