aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b2aa8bfbe7..2516b674fe 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.taskTime += info.duration
stageData.numActiveTasks -= 1
- val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
+ val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageData.completedIndices.add(info.index)
stageData.numCompleteTasks += 1
- (None, Option(taskEnd.taskMetrics))
- case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
+ (None, taskEnd.taskMetrics.accumulatorUpdates())
+ case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
stageData.numFailedTasks += 1
- (Some(e.toErrorString), e.metrics)
- case e: TaskFailedReason => // All other failure cases
+ (Some(e.toErrorString), e.accumUpdates)
+ case e: TaskFailedReason => // All other failure cases
stageData.numFailedTasks += 1
- (Some(e.toErrorString), None)
+ (Some(e.toErrorString), Seq.empty[AccumulableInfo])
}
- metrics.foreach { m =>
+ val taskMetrics =
+ if (accums.nonEmpty) {
+ Some(TaskMetrics.fromAccumulatorUpdates(accums))
+ } else {
+ None
+ }
+ taskMetrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
- taskData.taskMetrics = metrics
+ taskData.taskMetrics = taskMetrics
taskData.errorMessage = errorMessage
for (