diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index c3a14f48de..e0fec6a068 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.scalatest.Matchers -import org.apache.spark.{LocalSparkContext, SparkConf, Success} +import org.apache.spark._ import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) .shuffleRead == 1000) } + + test("test task success vs failure counting for different task end reasons") { + val conf = new SparkConf() + val listener = new JobProgressListener(conf) + val metrics = new TaskMetrics() + val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + val task = new ShuffleMapTask(0, null, null, 0, null) + val taskType = Utils.getFormattedClassName(task) + + // Go through all the failure cases to make sure we are counting them as failures. + val taskFailedReasons = Seq( + Resubmitted, + new FetchFailed(null, 0, 0, 0), + new ExceptionFailure("Exception", "description", null, None), + TaskResultLost, + TaskKilled, + ExecutorLostFailure, + UnknownReason) + for (reason <- taskFailedReasons) { + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + } + + // Make sure we count success as success. + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + } } |