aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-25 22:35:03 -0700
committerReynold Xin <rxin@apache.org>2014-06-25 22:35:03 -0700
commit4a346e242c3f241c575f35536220df01ad724e23 (patch)
tree7ee3877a961cf169b2cbbdea475a7cf34ef107fa /core/src/test
parentb88a59a66845b8935b22f06fc96d16841ed20c94 (diff)
downloadspark-4a346e242c3f241c575f35536220df01ad724e23.tar.gz
spark-4a346e242c3f241c575f35536220df01ad724e23.tar.bz2
spark-4a346e242c3f241c575f35536220df01ad724e23.zip
[SPARK-2284][UI] Mark all failed tasks as failures.
Previously only tasks failed with ExceptionFailure reason was marked as failure. Author: Reynold Xin <rxin@apache.org> Closes #1224 from rxin/SPARK-2284 and squashes the following commits: be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala30
1 files changed, 29 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c3a14f48de..e0fec6a068 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.scalatest.Matchers
-import org.apache.spark.{LocalSparkContext, SparkConf, Success}
+import org.apache.spark._
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000)
}
+
+ test("test task success vs failure counting for different task end reasons") {
+ val conf = new SparkConf()
+ val listener = new JobProgressListener(conf)
+ val metrics = new TaskMetrics()
+ val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ val task = new ShuffleMapTask(0, null, null, 0, null)
+ val taskType = Utils.getFormattedClassName(task)
+
+ // Go through all the failure cases to make sure we are counting them as failures.
+ val taskFailedReasons = Seq(
+ Resubmitted,
+ new FetchFailed(null, 0, 0, 0),
+ new ExceptionFailure("Exception", "description", null, None),
+ TaskResultLost,
+ TaskKilled,
+ ExecutorLostFailure,
+ UnknownReason)
+ for (reason <- taskFailedReasons) {
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
+ assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ }
+
+ // Make sure we count success as success.
+ listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
+ assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ }
}