From 56d88247f14ca54750816748f5b6b2aca7bc6fea Mon Sep 17 00:00:00 2001 From: GayathriMurali Date: Wed, 16 Mar 2016 09:38:41 +0000 Subject: [SPARK-13396] Stop using our internal deprecated .metrics on Exceptio… MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JIRA: https://issues.apache.org/jira/browse/SPARK-13396 Stop using our internal deprecated .metrics on ExceptionFailure instead use accumUpdates Author: GayathriMurali Closes #11544 from GayathriMurali/SPARK-13396. --- .../apache/spark/ui/jobs/JobProgressListener.scala | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b2aa8bfbe7..2516b674fe 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 - val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = + val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) = taskEnd.reason match { case org.apache.spark.Success => stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 - (None, Option(taskEnd.taskMetrics)) - case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics + (None, taskEnd.taskMetrics.accumulatorUpdates()) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 - (Some(e.toErrorString), e.metrics) - case e: TaskFailedReason => // All other failure cases + (Some(e.toErrorString), e.accumUpdates) + case e: TaskFailedReason => // All other failure cases stageData.numFailedTasks += 1 - (Some(e.toErrorString), None) + (Some(e.toErrorString), Seq.empty[AccumulableInfo]) } - metrics.foreach { m => + val taskMetrics = + if (accums.nonEmpty) { + Some(TaskMetrics.fromAccumulatorUpdates(accums)) + } else { + None + } + taskMetrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) taskData.taskInfo = info - taskData.taskMetrics = metrics + taskData.taskMetrics = taskMetrics taskData.errorMessage = errorMessage for ( -- cgit v1.2.3