aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorGayathriMurali <gayathri.m.softie@gmail.com>2016-03-16 09:38:41 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:39:41 +0000
commit56d88247f14ca54750816748f5b6b2aca7bc6fea (patch)
tree325e1778e782ef50464a80203ce17f3c4d752910 /core
parent3b461d9ecd633c4fd659998b99e700d76f58d18a (diff)
downloadspark-56d88247f14ca54750816748f5b6b2aca7bc6fea.tar.gz
spark-56d88247f14ca54750816748f5b6b2aca7bc6fea.tar.bz2
spark-56d88247f14ca54750816748f5b6b2aca7bc6fea.zip
[SPARK-13396] Stop using our internal deprecated .metrics on Exceptio…
JIRA: https://issues.apache.org/jira/browse/SPARK-13396 Stop using our internal deprecated .metrics on ExceptionFailure instead use accumUpdates Author: GayathriMurali <gayathri.m.softie@gmail.com> Closes #11544 from GayathriMurali/SPARK-13396.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b2aa8bfbe7..2516b674fe 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.taskTime += info.duration
stageData.numActiveTasks -= 1
- val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
+ val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageData.completedIndices.add(info.index)
stageData.numCompleteTasks += 1
- (None, Option(taskEnd.taskMetrics))
- case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
+ (None, taskEnd.taskMetrics.accumulatorUpdates())
+ case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
stageData.numFailedTasks += 1
- (Some(e.toErrorString), e.metrics)
- case e: TaskFailedReason => // All other failure cases
+ (Some(e.toErrorString), e.accumUpdates)
+ case e: TaskFailedReason => // All other failure cases
stageData.numFailedTasks += 1
- (Some(e.toErrorString), None)
+ (Some(e.toErrorString), Seq.empty[AccumulableInfo])
}
- metrics.foreach { m =>
+ val taskMetrics =
+ if (accums.nonEmpty) {
+ Some(TaskMetrics.fromAccumulatorUpdates(accums))
+ } else {
+ None
+ }
+ taskMetrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
- taskData.taskMetrics = metrics
+ taskData.taskMetrics = taskMetrics
taskData.errorMessage = errorMessage
for (