diff options
author | GayathriMurali <gayathri.m.softie@gmail.com> | 2016-03-16 09:38:41 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-16 09:39:41 +0000 |
commit | 56d88247f14ca54750816748f5b6b2aca7bc6fea (patch) | |
tree | 325e1778e782ef50464a80203ce17f3c4d752910 /core | |
parent | 3b461d9ecd633c4fd659998b99e700d76f58d18a (diff) | |
download | spark-56d88247f14ca54750816748f5b6b2aca7bc6fea.tar.gz spark-56d88247f14ca54750816748f5b6b2aca7bc6fea.tar.bz2 spark-56d88247f14ca54750816748f5b6b2aca7bc6fea.zip |
[SPARK-13396] Stop using our internal deprecated .metrics on Exceptio…
JIRA: https://issues.apache.org/jira/browse/SPARK-13396
Stop using our internal deprecated .metrics on ExceptionFailure instead use accumUpdates
Author: GayathriMurali <gayathri.m.softie@gmail.com>
Closes #11544 from GayathriMurali/SPARK-13396.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b2aa8bfbe7..2516b674fe 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 - val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = + val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) = taskEnd.reason match { case org.apache.spark.Success => stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 - (None, Option(taskEnd.taskMetrics)) - case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics + (None, taskEnd.taskMetrics.accumulatorUpdates()) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 - (Some(e.toErrorString), e.metrics) - case e: TaskFailedReason => // All other failure cases + (Some(e.toErrorString), e.accumUpdates) + case e: TaskFailedReason => // All other failure cases stageData.numFailedTasks += 1 - (Some(e.toErrorString), None) + (Some(e.toErrorString), Seq.empty[AccumulableInfo]) } - metrics.foreach { m => + val taskMetrics = + if (accums.nonEmpty) { + Some(TaskMetrics.fromAccumulatorUpdates(accums)) + } else { + None + } + taskMetrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) taskData.taskInfo = info - taskData.taskMetrics = metrics + taskData.taskMetrics = taskMetrics taskData.errorMessage = errorMessage for ( |