From c445b3af3b6c524c6113dd036df0ed2f909d184c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 25 Jun 2014 22:35:03 -0700 Subject: [SPARK-2284][UI] Mark all failed tasks as failures. Previously only tasks failed with ExceptionFailure reason was marked as failure. Author: Reynold Xin Closes #1224 from rxin/SPARK-2284 and squashes the following commits: be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures. (cherry picked from commit 4a346e242c3f241c575f35536220df01ad724e23) Signed-off-by: Reynold Xin --- .../apache/spark/ui/jobs/JobProgressListener.scala | 9 ++++--- .../spark/ui/jobs/JobProgressListenerSuite.scala | 30 +++++++++++++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 396cbcbc8d..bfefe4dbc4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { + case org.apache.spark.Success => + stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) case e: ExceptionFailure => stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 (Some(e), e.metrics) - case _ => - stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) + case e: org.apache.spark.TaskEndReason => + stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + (None, None) } stageIdToTime.getOrElseUpdate(sid, 0L) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 8c06a2d9aa..86a5cda7c8 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{LocalSparkContext, SparkConf, Success} +import org.apache.spark._ import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Shou assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) .shuffleRead == 1000) } + + test("test task success vs failure counting for different task end reasons") { + val conf = new SparkConf() + val listener = new JobProgressListener(conf) + val metrics = new TaskMetrics() + val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + val task = new ShuffleMapTask(0, null, null, 0, null) + val taskType = Utils.getFormattedClassName(task) + + // Go through all the failure cases to make sure we are counting them as failures. + val taskFailedReasons = Seq( + Resubmitted, + new FetchFailed(null, 0, 0, 0), + new ExceptionFailure("Exception", "description", null, None), + TaskResultLost, + TaskKilled, + ExecutorLostFailure, + UnknownReason) + for (reason <- taskFailedReasons) { + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + } + + // Make sure we count success as success. + listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) + assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + } } -- cgit v1.2.3