diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 24796c1430..0fd58c41cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashMap +import org.apache.spark.Accumulator import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.RDDInfo @@ -35,6 +36,7 @@ class StageInfo( val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String, + val internalAccumulators: Seq[Accumulator[_]] = Seq.empty, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -42,7 +44,11 @@ class StageInfo( var completionTime: Option[Long] = None /** If the stage failed, the reason why. */ var failureReason: Option[String] = None - /** Terminal values of accumulables updated during this stage. */ + + /** + * Terminal values of accumulables updated during this stage, including all the user-defined + * accumulators. + */ val accumulables = HashMap[Long, AccumulableInfo]() def stageFailed(reason: String) { @@ -75,6 +81,7 @@ private[spark] object StageInfo { stage: Stage, attemptId: Int, numTasks: Option[Int] = None, + internalAccumulators: Seq[Accumulator[_]] = Seq.empty, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) @@ -87,6 +94,7 @@ private[spark] object StageInfo { rddInfos, stage.parents.map(_.id), stage.details, + internalAccumulators, taskLocalityPreferences) } } |